Developer python data-pipeline human-in-the-loop intermediate

Building a Data Pipeline Skill

An ETL-style data pipeline skill with validation stages and human-in-the-loop confirmation, demonstrating multi-step workflows and safety patterns in Python.

Data pipelines are a natural fit for agent skills. They have discrete stages (extract, transform, load), each with clear inputs and outputs, and they often need human approval before doing anything destructive. This example builds an ETL-style skill in Python that validates data at every stage and pauses for human confirmation before writing to a destination.

If you are new to skill design, read the what are agent skills guide first. For the basics of tool definitions and parameter design, see the file search example. This article builds on those ideas with multi-step workflows, human-in-the-loop patterns, and error recovery.

What we’re building

A skill that:

  1. Extracts data from a source (CSV file, JSON API, or database query)
  2. Validates the extracted data against a schema
  3. Transforms it (type coercion, filtering, computed columns)
  4. Confirms with the user before loading (human-in-the-loop)
  5. Loads the data to a destination (database table, file, or API)

The main design constraint is safety. The skill must never silently write bad data or overwrite existing data without explicit human approval. This matters even more for agent-driven pipelines than for traditional ones, because the agent is constructing queries and transformations from natural language instructions that may be ambiguous.

The skill definition

DATA_PIPELINE_SKILL = {
    "name": "run_data_pipeline",
    "description": (
        "Extract data from a source, validate and transform it, then load "
        "it to a destination. Supports CSV files, JSON APIs, and SQL databases "
        "as sources and destinations.\n\n"
        "IMPORTANT: This skill will pause and ask for human confirmation "
        "before writing any data to the destination. It will show a preview "
        "of what will be written and wait for approval.\n\n"
        "Use this when you need to move data between systems with validation. "
        "Do NOT use this for simple file reads — use a file reader skill "
        "instead. Do NOT use this for analytical queries that don't need "
        "to write results anywhere — use a query skill instead."
    ),
    "parameters": {
        "type": "object",
        "properties": {
            "source": {
                "type": "object",
                "description": "Where to read data from",
                "properties": {
                    "type": {"type": "string", "enum": ["csv", "json_api", "sql"]},
                    "uri": {
                        "type": "string",
                        "description": (
                            "File path, URL, or connection string. "
                            "Examples: './data/input.csv', "
                            "'https://api.example.com/data', "
                            "'postgresql://host/db'"
                        ),
                    },
                    "query": {
                        "type": "string",
                        "description": "SQL query (required when type is 'sql')",
                    },
                },
                "required": ["type", "uri"],
            },
            "destination": {
                "type": "object",
                "description": "Where to write the transformed data",
                "properties": {
                    "type": {"type": "string", "enum": ["csv", "sql", "json"]},
                    "uri": {"type": "string", "description": "File path or connection string"},
                    "table": {
                        "type": "string",
                        "description": "Target table name (required when type is 'sql')",
                    },
                    "mode": {
                        "type": "string",
                        "enum": ["append", "replace", "upsert"],
                        "description": "Write mode. Defaults to 'append'.",
                    },
                },
                "required": ["type", "uri"],
            },
            "validations": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "column": {"type": "string"},
                        "rule": {
                            "type": "string",
                            "enum": [
                                "not_null",
                                "unique",
                                "min",
                                "max",
                                "regex",
                                "one_of",
                            ],
                        },
                        "value": {
                            "type": "string",
                            "description": "Rule parameter (e.g., min value, regex pattern, comma-separated list for one_of)",
                        },
                    },
                },
                "description": "Optional validation rules to apply to the extracted data",
            },
            "transforms": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "type": {
                            "type": "string",
                            "enum": [
                                "rename_column",
                                "cast_type",
                                "filter_rows",
                                "add_computed",
                                "drop_column",
                            ],
                        },
                        "params": {"type": "object"},
                    },
                },
                "description": "Optional transformations to apply in order",
            },
        },
        "required": ["source", "destination"],
    },
}

The parameter schema is verbose on purpose. When a skill accepts complex inputs, the agent relies on the schema to construct valid calls. Every enum value, description, and example reduces the chance of malformed parameters. This is the same idea from the tool use patterns guide, taken further because the input shape is more complex.

Core data types

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class PipelineStage(Enum):
    EXTRACT = "extract"
    VALIDATE = "validate"
    TRANSFORM = "transform"
    CONFIRM = "confirm"
    LOAD = "load"


class StageStatus(Enum):
    PENDING = "pending"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"
    AWAITING_CONFIRMATION = "awaiting_confirmation"


@dataclass
class StageResult:
    stage: PipelineStage
    status: StageStatus
    message: str
    row_count: int | None = None
    errors: list[str] = field(default_factory=list)
    warnings: list[str] = field(default_factory=list)
    preview: list[dict[str, Any]] | None = None


@dataclass
class PipelineResult:
    success: bool
    stages: list[StageResult]
    suggestion: str | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize for return to the agent."""
        return {
            "success": self.success,
            "stages": [
                {
                    "stage": s.stage.value,
                    "status": s.status.value,
                    "message": s.message,
                    "row_count": s.row_count,
                    "errors": s.errors,
                    "warnings": s.warnings,
                    "preview": s.preview,
                }
                for s in self.stages
            ],
            "suggestion": self.suggestion,
        }

Every stage produces a StageResult with a status, human-readable message, and optional error list. This tells the agent exactly where and why a pipeline failed. A flat error message like “pipeline failed” would force the agent to guess which stage broke, making recovery much harder.

The preview field on StageResult is for the human-in-the-loop pattern: we populate it with sample rows so the user can inspect what will be written before approving.

Stage 1: extraction

import csv
import io
from pathlib import Path
from urllib.parse import urlparse

import httpx


async def extract_data(
    source_type: str,
    uri: str,
    query: str | None = None,
) -> tuple[list[dict[str, Any]], StageResult]:
    """
    Extract data from the configured source.

    Returns the extracted rows and a StageResult describing
    what happened. On failure, returns an empty list and a
    StageResult with status FAILED.
    """
    try:
        if source_type == "csv":
            return await _extract_csv(uri)
        elif source_type == "json_api":
            return await _extract_json_api(uri)
        elif source_type == "sql":
            return await _extract_sql(uri, query)
        else:
            return [], StageResult(
                stage=PipelineStage.EXTRACT,
                status=StageStatus.FAILED,
                message=f"Unknown source type: {source_type}",
                errors=[f"Supported types: csv, json_api, sql"],
            )
    except Exception as exc:
        return [], StageResult(
            stage=PipelineStage.EXTRACT,
            status=StageStatus.FAILED,
            message=f"Extraction failed: {exc}",
            errors=[str(exc)],
        )


async def _extract_csv(uri: str) -> tuple[list[dict[str, Any]], StageResult]:
    path = Path(uri)
    if not path.exists():
        return [], StageResult(
            stage=PipelineStage.EXTRACT,
            status=StageStatus.FAILED,
            message=f"CSV file not found: {uri}",
            errors=[f"File does not exist: {uri}"],
        )

    # Guard against excessively large files
    file_size = path.stat().st_size
    max_size = 100 * 1024 * 1024  # 100MB
    if file_size > max_size:
        return [], StageResult(
            stage=PipelineStage.EXTRACT,
            status=StageStatus.FAILED,
            message=f"CSV file too large ({file_size / 1024 / 1024:.1f}MB). Max: 100MB.",
            errors=["File exceeds maximum size. Consider filtering at the source."],
        )

    content = path.read_text(encoding="utf-8")
    reader = csv.DictReader(io.StringIO(content))
    rows = list(reader)

    return rows, StageResult(
        stage=PipelineStage.EXTRACT,
        status=StageStatus.SUCCESS,
        message=f"Extracted {len(rows)} rows from {uri}",
        row_count=len(rows),
    )


async def _extract_json_api(uri: str) -> tuple[list[dict[str, Any]], StageResult]:
    # Validate URL scheme to prevent file:// or other protocol attacks
    parsed = urlparse(uri)
    if parsed.scheme not in ("http", "https"):
        return [], StageResult(
            stage=PipelineStage.EXTRACT,
            status=StageStatus.FAILED,
            message=f"Invalid URL scheme: {parsed.scheme}. Only http/https allowed.",
            errors=["URL must use http or https protocol"],
        )

    async with httpx.AsyncClient(timeout=30.0) as client:
        response = await client.get(uri)
        response.raise_for_status()
        data = response.json()

    # Handle both array responses and { "data": [...] } wrappers
    if isinstance(data, list):
        rows = data
    elif isinstance(data, dict) and "data" in data and isinstance(data["data"], list):
        rows = data["data"]
    else:
        return [], StageResult(
            stage=PipelineStage.EXTRACT,
            status=StageStatus.FAILED,
            message="API response is not a list or {data: [...]} wrapper",
            errors=["Expected array or object with 'data' key containing an array"],
        )

    return rows, StageResult(
        stage=PipelineStage.EXTRACT,
        status=StageStatus.SUCCESS,
        message=f"Extracted {len(rows)} rows from API",
        row_count=len(rows),
    )


async def _extract_sql(
    uri: str, query: str | None
) -> tuple[list[dict[str, Any]], StageResult]:
    if not query:
        return [], StageResult(
            stage=PipelineStage.EXTRACT,
            status=StageStatus.FAILED,
            message="SQL source requires a 'query' parameter",
            errors=["Provide a SELECT query to extract data"],
        )

    # Block destructive SQL -- this is a read-only extraction stage
    normalized = query.strip().upper()
    destructive_keywords = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE"]
    for keyword in destructive_keywords:
        if normalized.startswith(keyword):
            return [], StageResult(
                stage=PipelineStage.EXTRACT,
                status=StageStatus.FAILED,
                message=f"Extraction query must be a SELECT statement, not {keyword}",
                errors=[
                    "Only SELECT queries are allowed during extraction. "
                    "Data writing happens in the load stage."
                ],
            )

    # In a real implementation, use asyncpg or sqlalchemy here.
    # This is a placeholder to demonstrate the pattern.
    import sqlalchemy
    from sqlalchemy import text

    engine = sqlalchemy.create_engine(uri)
    with engine.connect() as conn:
        result = conn.execute(text(query))
        columns = list(result.keys())
        rows = [dict(zip(columns, row)) for row in result.fetchall()]

    return rows, StageResult(
        stage=PipelineStage.EXTRACT,
        status=StageStatus.SUCCESS,
        message=f"Extracted {len(rows)} rows from SQL query",
        row_count=len(rows),
    )

A few safety decisions to note. The CSV extractor enforces a file size limit because agents can and do point skills at unexpectedly large files. The JSON API extractor validates the URL scheme to prevent file:// protocol abuse, where the agent gets tricked into reading local files via a crafted URL. The SQL extractor blocks destructive statements in the extraction stage. All of these are defense-in-depth against agent misuse, whether accidental or adversarial.

Stage 2: validation

import re
from typing import Callable


# Registry of validation rule implementations
VALIDATION_RULES: dict[str, Callable[..., list[str]]] = {}


def register_rule(name: str):
    """Decorator to register a validation rule by name."""
    def decorator(fn: Callable[..., list[str]]):
        VALIDATION_RULES[name] = fn
        return fn
    return decorator


@register_rule("not_null")
def validate_not_null(rows: list[dict], column: str, **_: Any) -> list[str]:
    errors = []
    for i, row in enumerate(rows):
        val = row.get(column)
        if val is None or (isinstance(val, str) and val.strip() == ""):
            errors.append(f"Row {i + 1}: column '{column}' is null or empty")
    return errors


@register_rule("unique")
def validate_unique(rows: list[dict], column: str, **_: Any) -> list[str]:
    seen: dict[Any, int] = {}
    errors = []
    for i, row in enumerate(rows):
        val = row.get(column)
        if val in seen:
            errors.append(
                f"Row {i + 1}: duplicate value '{val}' in column '{column}' "
                f"(first seen in row {seen[val]})"
            )
        else:
            seen[val] = i + 1
    return errors


@register_rule("min")
def validate_min(rows: list[dict], column: str, value: str = "0", **_: Any) -> list[str]:
    threshold = float(value)
    errors = []
    for i, row in enumerate(rows):
        try:
            if float(row.get(column, 0)) < threshold:
                errors.append(
                    f"Row {i + 1}: column '{column}' value {row[column]} "
                    f"is below minimum {threshold}"
                )
        except (ValueError, TypeError):
            errors.append(
                f"Row {i + 1}: column '{column}' value '{row.get(column)}' "
                f"is not numeric"
            )
    return errors


@register_rule("max")
def validate_max(rows: list[dict], column: str, value: str = "0", **_: Any) -> list[str]:
    threshold = float(value)
    errors = []
    for i, row in enumerate(rows):
        try:
            if float(row.get(column, 0)) > threshold:
                errors.append(
                    f"Row {i + 1}: column '{column}' value {row[column]} "
                    f"exceeds maximum {threshold}"
                )
        except (ValueError, TypeError):
            errors.append(
                f"Row {i + 1}: column '{column}' value '{row.get(column)}' "
                f"is not numeric"
            )
    return errors


@register_rule("regex")
def validate_regex(rows: list[dict], column: str, value: str = "", **_: Any) -> list[str]:
    try:
        pattern = re.compile(value)
    except re.error as exc:
        return [f"Invalid regex pattern '{value}': {exc}"]

    errors = []
    for i, row in enumerate(rows):
        val = str(row.get(column, ""))
        if not pattern.match(val):
            errors.append(
                f"Row {i + 1}: column '{column}' value '{val}' "
                f"does not match pattern '{value}'"
            )
    return errors


@register_rule("one_of")
def validate_one_of(rows: list[dict], column: str, value: str = "", **_: Any) -> list[str]:
    allowed = {v.strip() for v in value.split(",")}
    errors = []
    for i, row in enumerate(rows):
        val = str(row.get(column, ""))
        if val not in allowed:
            errors.append(
                f"Row {i + 1}: column '{column}' value '{val}' "
                f"is not one of: {', '.join(sorted(allowed))}"
            )
    return errors


def validate_data(
    rows: list[dict[str, Any]],
    validations: list[dict[str, str]] | None,
) -> StageResult:
    """Run all configured validations against the data."""
    if not validations:
        return StageResult(
            stage=PipelineStage.VALIDATE,
            status=StageStatus.SKIPPED,
            message="No validation rules configured",
            row_count=len(rows),
        )

    all_errors: list[str] = []
    all_warnings: list[str] = []

    for rule_config in validations:
        column = rule_config.get("column", "")
        rule_name = rule_config.get("rule", "")
        rule_value = rule_config.get("value", "")

        rule_fn = VALIDATION_RULES.get(rule_name)
        if rule_fn is None:
            all_warnings.append(
                f"Unknown validation rule '{rule_name}' — skipping. "
                f"Available rules: {', '.join(VALIDATION_RULES.keys())}"
            )
            continue

        # Check that the column actually exists in the data
        if rows and column not in rows[0]:
            all_errors.append(
                f"Column '{column}' not found in data. "
                f"Available columns: {', '.join(rows[0].keys())}"
            )
            continue

        errors = rule_fn(rows, column, value=rule_value)

        # Cap per-rule errors to avoid flooding the output
        max_errors_per_rule = 10
        if len(errors) > max_errors_per_rule:
            all_errors.extend(errors[:max_errors_per_rule])
            all_errors.append(
                f"... and {len(errors) - max_errors_per_rule} more errors "
                f"for rule '{rule_name}' on column '{column}'"
            )
        else:
            all_errors.extend(errors)

    if all_errors:
        return StageResult(
            stage=PipelineStage.VALIDATE,
            status=StageStatus.FAILED,
            message=f"Validation failed with {len(all_errors)} error(s)",
            row_count=len(rows),
            errors=all_errors,
            warnings=all_warnings,
        )

    return StageResult(
        stage=PipelineStage.VALIDATE,
        status=StageStatus.SUCCESS,
        message=f"All validations passed for {len(rows)} rows",
        row_count=len(rows),
        warnings=all_warnings,
    )

The validation rules use a registry pattern. Each rule is a standalone function registered by name, so adding a new one is just a @register_rule("new_name") decorator away. Unknown rule names produce a warning rather than a hard failure, letting the agent correct its parameters on the next attempt.

Error capping at 10 per rule prevents a single check from flooding the agent’s context window. A not_null check on a column with thousands of nulls is no more informative at 10,000 errors than at 10.

Stage 3: transformation

def apply_transforms(
    rows: list[dict[str, Any]],
    transforms: list[dict[str, Any]] | None,
) -> tuple[list[dict[str, Any]], StageResult]:
    """Apply transformations in order, returning the modified rows."""
    if not transforms:
        return rows, StageResult(
            stage=PipelineStage.TRANSFORM,
            status=StageStatus.SKIPPED,
            message="No transforms configured",
            row_count=len(rows),
        )

    warnings: list[str] = []
    current_rows = [dict(row) for row in rows]  # Deep copy to avoid mutation

    for i, transform in enumerate(transforms):
        t_type = transform.get("type", "")
        params = transform.get("params", {})

        try:
            if t_type == "rename_column":
                old_name = params["from"]
                new_name = params["to"]
                current_rows = [
                    {(new_name if k == old_name else k): v for k, v in row.items()}
                    for row in current_rows
                ]

            elif t_type == "cast_type":
                column = params["column"]
                target = params["target"]  # "int", "float", "str", "bool"
                casters = {
                    "int": int, "float": float,
                    "str": str, "bool": lambda v: str(v).lower() in ("true", "1", "yes"),
                }
                caster = casters.get(target)
                if not caster:
                    warnings.append(
                        f"Transform {i + 1}: unknown cast target '{target}'. "
                        f"Supported: {', '.join(casters.keys())}"
                    )
                    continue
                cast_errors = 0
                for row in current_rows:
                    try:
                        row[column] = caster(row[column])
                    except (ValueError, TypeError, KeyError):
                        cast_errors += 1
                if cast_errors > 0:
                    warnings.append(
                        f"Transform {i + 1}: {cast_errors} rows failed to cast "
                        f"column '{column}' to {target}"
                    )

            elif t_type == "filter_rows":
                column = params["column"]
                operator = params["operator"]  # "eq", "neq", "gt", "lt", "contains"
                value = params["value"]
                before_count = len(current_rows)
                current_rows = _filter_rows(current_rows, column, operator, value)
                filtered = before_count - len(current_rows)
                if filtered > 0:
                    warnings.append(
                        f"Transform {i + 1}: filtered out {filtered} rows "
                        f"({len(current_rows)} remaining)"
                    )

            elif t_type == "add_computed":
                new_col = params["column"]
                expression = params["expression"]
                # We use a restricted eval with only math operations
                # for safety. A production system should use a proper
                # expression parser.
                for row in current_rows:
                    row[new_col] = _evaluate_expression(expression, row)

            elif t_type == "drop_column":
                column = params["column"]
                current_rows = [
                    {k: v for k, v in row.items() if k != column}
                    for row in current_rows
                ]

            else:
                warnings.append(f"Transform {i + 1}: unknown type '{t_type}', skipping")

        except KeyError as exc:
            return rows, StageResult(
                stage=PipelineStage.TRANSFORM,
                status=StageStatus.FAILED,
                message=f"Transform {i + 1} ({t_type}) failed: missing parameter {exc}",
                row_count=len(rows),
                errors=[f"Missing required parameter: {exc}"],
                warnings=warnings,
            )
        except Exception as exc:
            return rows, StageResult(
                stage=PipelineStage.TRANSFORM,
                status=StageStatus.FAILED,
                message=f"Transform {i + 1} ({t_type}) failed: {exc}",
                row_count=len(rows),
                errors=[str(exc)],
                warnings=warnings,
            )

    return current_rows, StageResult(
        stage=PipelineStage.TRANSFORM,
        status=StageStatus.SUCCESS,
        message=f"Applied {len(transforms)} transform(s), {len(current_rows)} rows remaining",
        row_count=len(current_rows),
        warnings=warnings,
    )


def _filter_rows(
    rows: list[dict], column: str, operator: str, value: Any
) -> list[dict]:
    result = []
    for row in rows:
        cell = row.get(column)
        if operator == "eq" and str(cell) == str(value):
            result.append(row)
        elif operator == "neq" and str(cell) != str(value):
            result.append(row)
        elif operator == "gt" and float(cell) > float(value):
            result.append(row)
        elif operator == "lt" and float(cell) < float(value):
            result.append(row)
        elif operator == "contains" and str(value) in str(cell):
            result.append(row)
    return result


def _evaluate_expression(expression: str, row: dict[str, Any]) -> Any:
    """
    Evaluate a simple arithmetic expression with column references.

    This is deliberately restrictive. Column names are substituted
    as variables, and only basic arithmetic is allowed. A production
    system should use a sandboxed expression evaluator.
    """
    # Substitute column references
    local_vars = {k: _try_numeric(v) for k, v in row.items()}

    # Restrict to safe builtins
    safe_builtins = {"abs": abs, "round": round, "min": min, "max": max}
    return eval(expression, {"__builtins__": safe_builtins}, local_vars)


def _try_numeric(value: Any) -> Any:
    """Attempt to convert a value to a number for expression evaluation."""
    try:
        return int(value)
    except (ValueError, TypeError):
        try:
            return float(value)
        except (ValueError, TypeError):
            return value

The deep copy on line 3 of apply_transforms is intentional. Transforms modify rows in place for performance, but we never want to mutate the input data. If a transform fails halfway through, we return the original rows alongside the error. The agent (or user) can retry with corrected parameters without losing data.

On _evaluate_expression: yes, we use eval. This is a real tradeoff. Sandboxing eval by restricting __builtins__ is not bulletproof, and a production system should use a proper expression parser like asteval or simpleeval. We document the limitation explicitly so anyone building on this example knows what to replace. The restricted builtins at least prevent the most obvious attacks (import, exec, file access).

Stage 4: human-in-the-loop confirmation

This is the most important stage from a safety perspective. Before writing any data, the skill pauses and asks for human approval.

def build_confirmation_request(
    rows: list[dict[str, Any]],
    destination: dict[str, str],
    stage_results: list[StageResult],
) -> PipelineResult:
    """
    Build a result that pauses the pipeline and requests human
    confirmation before proceeding to the load stage.

    The agent should present the preview to the user and only
    continue if the user explicitly approves.
    """
    # Build a preview of the first few rows
    preview_rows = rows[:5]
    total = len(rows)

    dest_desc = f"{destination['type']}:{destination['uri']}"
    if destination.get("table"):
        dest_desc += f" (table: {destination['table']})"

    mode = destination.get("mode", "append")

    confirm_stage = StageResult(
        stage=PipelineStage.CONFIRM,
        status=StageStatus.AWAITING_CONFIRMATION,
        message=(
            f"Ready to {mode} {total} rows to {dest_desc}. "
            f"Preview of first {len(preview_rows)} rows is attached. "
            "Please confirm to proceed."
        ),
        row_count=total,
        preview=preview_rows,
        warnings=(
            [f"Mode is '{mode}' — this will overwrite existing data"]
            if mode == "replace"
            else []
        ),
    )

    return PipelineResult(
        success=True,  # Pipeline succeeded so far; awaiting confirmation
        stages=[*stage_results, confirm_stage],
        suggestion=(
            "Present the preview to the user and ask for confirmation. "
            "If approved, call run_data_pipeline_load to complete the pipeline. "
            "If rejected, no data will be written."
        ),
    )

The AWAITING_CONFIRMATION status is distinct from both SUCCESS and FAILED. The pipeline isn’t complete or broken; it’s deliberately paused. The suggestion field tells the agent exactly what to do: show the preview, ask the user, and call a separate load function if approved.

This two-call pattern (review then confirm) is a basic safety mechanism for any skill that does destructive operations. The code review skill uses a similar approach: analyze first, then let the agent decide what to do with the results.

Stage 5: loading

async def load_data(
    rows: list[dict[str, Any]],
    destination: dict[str, str],
) -> StageResult:
    """Write validated and transformed data to the destination."""
    dest_type = destination["type"]
    uri = destination["uri"]
    mode = destination.get("mode", "append")

    try:
        if dest_type == "csv":
            return await _load_csv(rows, uri, mode)
        elif dest_type == "sql":
            table = destination.get("table")
            if not table:
                return StageResult(
                    stage=PipelineStage.LOAD,
                    status=StageStatus.FAILED,
                    message="SQL destination requires a 'table' parameter",
                    errors=["Specify the target table name"],
                )
            return await _load_sql(rows, uri, table, mode)
        elif dest_type == "json":
            return await _load_json(rows, uri, mode)
        else:
            return StageResult(
                stage=PipelineStage.LOAD,
                status=StageStatus.FAILED,
                message=f"Unknown destination type: {dest_type}",
                errors=[f"Supported destination types: csv, sql, json"],
            )
    except Exception as exc:
        return StageResult(
            stage=PipelineStage.LOAD,
            status=StageStatus.FAILED,
            message=f"Load failed: {exc}",
            errors=[str(exc)],
        )


async def _load_csv(
    rows: list[dict[str, Any]], uri: str, mode: str
) -> StageResult:
    path = Path(uri)

    # For append mode, check if file exists and has compatible headers
    if mode == "append" and path.exists():
        existing = path.read_text().splitlines()
        if existing:
            existing_headers = existing[0].split(",")
            new_headers = list(rows[0].keys()) if rows else []
            if existing_headers != new_headers:
                return StageResult(
                    stage=PipelineStage.LOAD,
                    status=StageStatus.FAILED,
                    message="Column mismatch with existing CSV",
                    errors=[
                        f"Existing columns: {existing_headers}",
                        f"New columns: {new_headers}",
                    ],
                )

    write_mode = "a" if mode == "append" and path.exists() else "w"
    write_header = write_mode == "w"

    with open(path, write_mode, newline="") as f:
        if rows:
            writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
            if write_header:
                writer.writeheader()
            writer.writerows(rows)

    return StageResult(
        stage=PipelineStage.LOAD,
        status=StageStatus.SUCCESS,
        message=f"Wrote {len(rows)} rows to {uri} (mode: {mode})",
        row_count=len(rows),
    )


async def _load_json(
    rows: list[dict[str, Any]], uri: str, mode: str
) -> StageResult:
    import json

    path = Path(uri)

    if mode == "append" and path.exists():
        existing = json.loads(path.read_text())
        if not isinstance(existing, list):
            return StageResult(
                stage=PipelineStage.LOAD,
                status=StageStatus.FAILED,
                message="Existing JSON file is not an array — cannot append",
                errors=["Append mode requires the existing file to contain a JSON array"],
            )
        rows = existing + rows

    path.write_text(json.dumps(rows, indent=2, default=str))

    return StageResult(
        stage=PipelineStage.LOAD,
        status=StageStatus.SUCCESS,
        message=f"Wrote {len(rows)} rows to {uri} (mode: {mode})",
        row_count=len(rows),
    )


async def _load_sql(
    rows: list[dict[str, Any]], uri: str, table: str, mode: str
) -> StageResult:
    import sqlalchemy
    from sqlalchemy import text

    engine = sqlalchemy.create_engine(uri)

    with engine.begin() as conn:
        if mode == "replace":
            conn.execute(text(f"DELETE FROM {table}"))

        if rows:
            columns = list(rows[0].keys())
            placeholders = ", ".join(f":{col}" for col in columns)
            col_list = ", ".join(columns)
            stmt = text(f"INSERT INTO {table} ({col_list}) VALUES ({placeholders})")
            conn.execute(stmt, rows)

    return StageResult(
        stage=PipelineStage.LOAD,
        status=StageStatus.SUCCESS,
        message=f"Loaded {len(rows)} rows into {table} (mode: {mode})",
        row_count=len(rows),
    )

Composing the full pipeline

async def run_data_pipeline(
    source: dict[str, str],
    destination: dict[str, str],
    validations: list[dict[str, str]] | None = None,
    transforms: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    """
    Entry point for the data pipeline skill.

    Runs extract -> validate -> transform -> confirm.
    Does NOT load — loading requires a separate call after
    human confirmation.
    """
    stages: list[StageResult] = []

    # Stage 1: Extract
    rows, extract_result = await extract_data(
        source["type"], source["uri"], source.get("query")
    )
    stages.append(extract_result)
    if extract_result.status == StageStatus.FAILED:
        return PipelineResult(
            success=False,
            stages=stages,
            suggestion="Fix the source configuration and try again.",
        ).to_dict()

    # Stage 2: Validate
    validate_result = validate_data(rows, validations)
    stages.append(validate_result)
    if validate_result.status == StageStatus.FAILED:
        return PipelineResult(
            success=False,
            stages=stages,
            suggestion=(
                "Validation failed. Review the errors above. You can either "
                "fix the source data or adjust the validation rules."
            ),
        ).to_dict()

    # Stage 3: Transform
    transformed_rows, transform_result = apply_transforms(rows, transforms)
    stages.append(transform_result)
    if transform_result.status == StageStatus.FAILED:
        return PipelineResult(
            success=False,
            stages=stages,
            suggestion="Fix the transform configuration and try again.",
        ).to_dict()

    # Stage 4: Pause for confirmation
    confirmation = build_confirmation_request(
        transformed_rows, destination, stages
    )
    return confirmation.to_dict()

The pipeline stops at confirmation. Loading is a separate function call, invoked only after the agent gets human approval. This separation is the core safety mechanism: the agent cannot accidentally write data without the user seeing a preview first.

Testing

import pytest
from unittest.mock import patch, AsyncMock
from pathlib import Path


# --- Validation tests ---

class TestValidation:
    def test_not_null_catches_empty_strings(self):
        rows = [{"name": "Alice"}, {"name": ""}, {"name": "Charlie"}]
        result = validate_data(rows, [{"column": "name", "rule": "not_null"}])
        assert result.status == StageStatus.FAILED
        assert len(result.errors) == 1
        assert "Row 2" in result.errors[0]

    def test_unique_catches_duplicates(self):
        rows = [{"id": "1"}, {"id": "2"}, {"id": "1"}]
        result = validate_data(rows, [{"column": "id", "rule": "unique"}])
        assert result.status == StageStatus.FAILED
        assert "duplicate" in result.errors[0].lower()

    def test_min_validates_numeric_threshold(self):
        rows = [{"age": "25"}, {"age": "17"}, {"age": "30"}]
        result = validate_data(rows, [{"column": "age", "rule": "min", "value": "18"}])
        assert result.status == StageStatus.FAILED
        assert "Row 2" in result.errors[0]

    def test_regex_validates_pattern(self):
        rows = [{"email": "a@b.com"}, {"email": "invalid"}]
        result = validate_data(
            rows,
            [{"column": "email", "rule": "regex", "value": r".+@.+\..+"}],
        )
        assert result.status == StageStatus.FAILED
        assert "Row 2" in result.errors[0]

    def test_one_of_validates_allowed_values(self):
        rows = [{"status": "active"}, {"status": "deleted"}]
        result = validate_data(
            rows,
            [{"column": "status", "rule": "one_of", "value": "active,inactive"}],
        )
        assert result.status == StageStatus.FAILED
        assert "deleted" in result.errors[0]

    def test_missing_column_is_an_error(self):
        rows = [{"name": "Alice"}]
        result = validate_data(rows, [{"column": "age", "rule": "not_null"}])
        assert result.status == StageStatus.FAILED
        assert "not found" in result.errors[0].lower()

    def test_unknown_rule_is_a_warning_not_error(self):
        rows = [{"name": "Alice"}]
        result = validate_data(rows, [{"column": "name", "rule": "nonexistent_rule"}])
        assert result.status != StageStatus.FAILED
        assert len(result.warnings) == 1

    def test_no_validations_returns_skipped(self):
        rows = [{"name": "Alice"}]
        result = validate_data(rows, None)
        assert result.status == StageStatus.SKIPPED

    def test_all_passing_returns_success(self):
        rows = [{"name": "Alice", "age": "25"}, {"name": "Bob", "age": "30"}]
        result = validate_data(
            rows,
            [
                {"column": "name", "rule": "not_null"},
                {"column": "name", "rule": "unique"},
            ],
        )
        assert result.status == StageStatus.SUCCESS

    def test_error_capping_limits_output(self):
        # 50 rows with null values, but only 10 errors should be reported
        rows = [{"name": ""} for _ in range(50)]
        result = validate_data(rows, [{"column": "name", "rule": "not_null"}])
        # 10 individual errors + 1 summary = 11
        assert len(result.errors) == 11
        assert "and 40 more" in result.errors[-1]


# --- Transform tests ---

class TestTransforms:
    def test_rename_column(self):
        rows = [{"old_name": "value"}]
        result_rows, stage = apply_transforms(
            rows,
            [{"type": "rename_column", "params": {"from": "old_name", "to": "new_name"}}],
        )
        assert "new_name" in result_rows[0]
        assert "old_name" not in result_rows[0]

    def test_cast_type_to_int(self):
        rows = [{"count": "42"}]
        result_rows, _ = apply_transforms(
            rows,
            [{"type": "cast_type", "params": {"column": "count", "target": "int"}}],
        )
        assert result_rows[0]["count"] == 42
        assert isinstance(result_rows[0]["count"], int)

    def test_filter_rows(self):
        rows = [{"status": "active"}, {"status": "inactive"}, {"status": "active"}]
        result_rows, stage = apply_transforms(
            rows,
            [{"type": "filter_rows", "params": {"column": "status", "operator": "eq", "value": "active"}}],
        )
        assert len(result_rows) == 2
        assert stage.status == StageStatus.SUCCESS

    def test_drop_column(self):
        rows = [{"keep": 1, "drop_me": 2}]
        result_rows, _ = apply_transforms(
            rows,
            [{"type": "drop_column", "params": {"column": "drop_me"}}],
        )
        assert "drop_me" not in result_rows[0]
        assert "keep" in result_rows[0]

    def test_transform_failure_returns_original_rows(self):
        rows = [{"name": "Alice"}]
        result_rows, stage = apply_transforms(
            rows,
            [{"type": "cast_type", "params": {}}],  # Missing required "column"
        )
        assert stage.status == StageStatus.FAILED
        assert result_rows == rows  # Original rows returned

    def test_transforms_do_not_mutate_input(self):
        rows = [{"name": "Alice"}]
        original = [dict(r) for r in rows]
        apply_transforms(
            rows,
            [{"type": "rename_column", "params": {"from": "name", "to": "full_name"}}],
        )
        assert rows == original  # Input unchanged


# --- Extraction tests ---

class TestExtraction:
    @pytest.mark.asyncio
    async def test_csv_file_not_found(self):
        rows, result = await extract_data("csv", "/nonexistent/file.csv")
        assert result.status == StageStatus.FAILED
        assert "not found" in result.message.lower()
        assert rows == []

    @pytest.mark.asyncio
    async def test_csv_extraction(self, tmp_path: Path):
        csv_file = tmp_path / "test.csv"
        csv_file.write_text("name,age\nAlice,30\nBob,25\n")
        rows, result = await extract_data("csv", str(csv_file))
        assert result.status == StageStatus.SUCCESS
        assert len(rows) == 2
        assert rows[0]["name"] == "Alice"

    @pytest.mark.asyncio
    async def test_sql_blocks_destructive_queries(self):
        rows, result = await extract_data(
            "sql", "sqlite:///test.db", query="DROP TABLE users"
        )
        assert result.status == StageStatus.FAILED
        assert "SELECT" in result.errors[0]

    @pytest.mark.asyncio
    async def test_sql_requires_query(self):
        rows, result = await extract_data("sql", "sqlite:///test.db")
        assert result.status == StageStatus.FAILED
        assert "query" in result.message.lower()

    @pytest.mark.asyncio
    async def test_json_api_rejects_file_protocol(self):
        rows, result = await extract_data("json_api", "file:///etc/passwd")
        assert result.status == StageStatus.FAILED
        assert "http" in result.message.lower()

    @pytest.mark.asyncio
    async def test_unknown_source_type(self):
        rows, result = await extract_data("xml", "/some/file.xml")
        assert result.status == StageStatus.FAILED
        assert "Unknown source type" in result.message


# --- Confirmation tests ---

class TestConfirmation:
    def test_preview_includes_first_five_rows(self):
        rows = [{"id": i} for i in range(20)]
        result = build_confirmation_request(
            rows, {"type": "csv", "uri": "out.csv"}, []
        )
        confirm_stage = result.stages[-1]
        assert confirm_stage.status == StageStatus.AWAITING_CONFIRMATION
        assert len(confirm_stage.preview) == 5

    def test_replace_mode_adds_warning(self):
        rows = [{"id": 1}]
        result = build_confirmation_request(
            rows, {"type": "csv", "uri": "out.csv", "mode": "replace"}, []
        )
        confirm_stage = result.stages[-1]
        assert any("overwrite" in w for w in confirm_stage.warnings)

    def test_append_mode_has_no_warning(self):
        rows = [{"id": 1}]
        result = build_confirmation_request(
            rows, {"type": "csv", "uri": "out.csv", "mode": "append"}, []
        )
        confirm_stage = result.stages[-1]
        assert len(confirm_stage.warnings) == 0


# --- Integration tests ---

class TestPipelineIntegration:
    @pytest.mark.asyncio
    async def test_full_pipeline_pauses_at_confirmation(self, tmp_path: Path):
        csv_file = tmp_path / "input.csv"
        csv_file.write_text("name,age\nAlice,30\nBob,25\n")

        result = await run_data_pipeline(
            source={"type": "csv", "uri": str(csv_file)},
            destination={"type": "csv", "uri": str(tmp_path / "output.csv")},
        )

        assert result["success"] is True
        stages = result["stages"]
        assert stages[-1]["status"] == "awaiting_confirmation"
        assert stages[-1]["preview"] is not None

    @pytest.mark.asyncio
    async def test_pipeline_stops_on_validation_failure(self, tmp_path: Path):
        csv_file = tmp_path / "input.csv"
        csv_file.write_text("name,age\nAlice,\nBob,25\n")

        result = await run_data_pipeline(
            source={"type": "csv", "uri": str(csv_file)},
            destination={"type": "csv", "uri": str(tmp_path / "output.csv")},
            validations=[{"column": "age", "rule": "not_null"}],
        )

        assert result["success"] is False
        # Load stage should never be reached
        stage_names = [s["stage"] for s in result["stages"]]
        assert "load" not in stage_names

    @pytest.mark.asyncio
    async def test_pipeline_stops_on_extraction_failure(self):
        result = await run_data_pipeline(
            source={"type": "csv", "uri": "/nonexistent/file.csv"},
            destination={"type": "csv", "uri": "/tmp/output.csv"},
        )

        assert result["success"] is False
        assert result["stages"][0]["status"] == "failed"
        assert result["suggestion"] is not None

Testing philosophy

The tests are organized by stage, matching the pipeline’s own structure. Each stage has isolated unit tests, and the integration tests verify the pipeline’s most important invariant: early termination. When extraction fails, validation never runs; when validation fails, transformation never runs; and loading never runs without confirmation.

Design tradeoffs

Two-call confirmation vs. single-call with callback. We split the pipeline into two calls: one that runs through confirmation, and a separate load_data call. An alternative is a callback-based approach where the skill suspends execution and resumes when the user approves. The two-call approach is simpler and more portable (it works with any agent framework) but requires the agent to manage the intermediate state (the transformed rows) between calls.

Validation before transformation vs. after. We validate the raw extracted data before transforming it. Validating after transformation would catch more issues (like transforms producing invalid data) but makes error messages confusing, because the user sees errors against transformed column names they might not recognize. We went with “validate early” for clarity, expecting that transforms are simpler and less likely to produce invalid data than raw sources.

Error accumulation vs. fail-fast. Validation collects all errors before failing, rather than stopping at the first one. For data pipelines, this is the right call. A single validation pass that reports all issues is far more useful than making the user fix one error at a time across multiple runs.

Key takeaways

Always pause before destructive operations. The human-in-the-loop pattern is non-negotiable for skills that write data. Report stage-level status so the agent knows exactly which stage failed, not just that “the pipeline failed.” Validate early and catch bad data before it propagates through transformations. Return the original data on transform failure so the agent never ends up with corrupted intermediate state. Cap error output, because a validation that produces 10,000 errors is no more useful than one that produces 10. And treat agent inputs as untrusted: validate URLs, block destructive SQL, restrict expression evaluation.