Skip to main content
View on GitHub This example shows how to build resilient AI workflows using Prefect and pydantic-ai. The integration provides automatic retries for LLM calls, full observability of agent decisions, and durable execution semantics that make workflows idempotent and rerunnable.

The Scenario: AI Data Analyst

You need to analyze datasets programmatically, but writing custom analysis code for each dataset is time-consuming. Instead, you’ll build an AI agent that:
  1. Understands your dataset structure
  2. Decides which analyses are most valuable
  3. Uses Python tools to calculate statistics and detect anomalies
  4. Generates actionable insights
All while being resilient to LLM failures, tool errors, and network issues. This example demonstrates:
  • PrefectAgent – Wraps pydantic-ai agents for durable execution
  • Agent Tools – Python functions the AI can call, automatically wrapped as Prefect tasks
  • TaskConfig – Custom retry policies and timeouts for AI operations
  • Durable Execution – Automatic idempotency and failure recovery

Setup

Install dependencies (if not already installed):
uv add pydantic-ai[prefect] pandas
# or with pip:
pip install "pydantic-ai[prefect]" pandas
from __future__ import annotations

from typing import Any

import pandas as pd
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig

from prefect import flow, task

Agent Tools

These functions are “tools” that the AI agent can call to analyze data. Prefect automatically wraps each tool execution as a task for observability and retries.
def calculate_statistics(ctx: RunContext[pd.DataFrame], column: str) -> dict[str, Any]:
    """Calculate descriptive statistics for a column.

    The AI agent can call this tool to understand data distribution,
    and Prefect ensures it retries on failure."""
    df = ctx.deps
    if column not in df.columns:
        return {"error": f"Column '{column}' not found. Available: {list(df.columns)}"}

    stats = df[column].describe().to_dict()
    stats["missing_count"] = int(df[column].isna().sum())
    stats["unique_count"] = int(df[column].nunique())
    return {
        k: (float(v) if isinstance(v, (int, float)) else v) for k, v in stats.items()
    }


def detect_anomalies(
    ctx: RunContext[pd.DataFrame], column: str, threshold: float = 3.0
) -> list[dict[str, Any]]:
    """Detect anomalies using standard deviation method.

    Identifies values that are more than `threshold` standard deviations from the mean.
    This tool demonstrates how complex analysis logic can be made reliable with Prefect."""
    df = ctx.deps
    if column not in df.columns:
        return [{"error": f"Column '{column}' not found"}]

    if not pd.api.types.is_numeric_dtype(df[column]):
        return [{"error": f"Column '{column}' is not numeric"}]

    mean = df[column].mean()
    std = df[column].std()

    if std == 0:
        return []

    anomalies = df[abs(df[column] - mean) > (threshold * std)]
    return [
        {
            "index": int(idx),
            "value": float(row[column]),
            "z_score": float((row[column] - mean) / std),
        }
        for idx, row in anomalies.head(10).iterrows()
    ]


def get_column_info(ctx: RunContext[pd.DataFrame]) -> dict[str, Any]:
    """Get overview of all columns in the dataset.

    Helps the AI agent understand the dataset structure before analysis."""
    df = ctx.deps
    return {
        "columns": list(df.columns),
        "shape": {"rows": len(df), "columns": len(df.columns)},
        "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
    }


Analysis Results Model

Structured output ensures the AI returns consistent, parseable results.
class DataAnalysis(BaseModel):
    """Structured analysis results from the AI agent."""

    summary: str = Field(description="High-level summary of the dataset")
    key_findings: list[str] = Field(
        description="Key findings discovered from the data", min_length=3, max_length=5
    )
    recommendations: list[str] = Field(
        description="Actionable recommendations based on the findings",
        min_length=3,
        max_length=5,
    )
    columns_analyzed: list[str] = Field(
        description="List of columns that were analyzed"
    )

    def __str__(self) -> str:
        """Format the analysis results for clean display."""
        findings = "\n".join(
            f"  {i}. {finding}" for i, finding in enumerate(self.key_findings, 1)
        )
        recommendations = "\n".join(
            f"  {i}. {rec}" for i, rec in enumerate(self.recommendations, 1)
        )

        return f"""
{"=" * 80}
ANALYSIS RESULTS
{"=" * 80}

📋 Summary:
{self.summary}

🔑 Key Findings:
{findings}

💡 Recommendations:
{recommendations}

📊 Columns Analyzed: {", ".join(self.columns_analyzed)}
{"=" * 80}
"""


Creating the AI Agent

We configure the agent with tools and wrap it with PrefectAgent for durability.
def create_data_analyst_agent() -> PrefectAgent[pd.DataFrame, DataAnalysis]:
    """Create an AI data analyst with Prefect durability.

    The PrefectAgent wrapper automatically:
    - Wraps agent.run as a Prefect flow
    - Wraps LLM calls as Prefect tasks with retries
    - Wraps tool calls as separate Prefect tasks
    """

    # Create the base pydantic-ai agent
    agent = Agent(
        "openai:gpt-4o",
        name="data-analyst-agent",
        output_type=DataAnalysis,
        deps_type=pd.DataFrame,
        # Register tools that the agent can use
        tools=[calculate_statistics, detect_anomalies, get_column_info],
        system_prompt=(
            "You are an expert data analyst. Analyze the provided dataset using "
            "the available tools. Focus on finding meaningful patterns, anomalies, "
            "and actionable insights. Always start by understanding the dataset "
            "structure with get_column_info."
        ),
    )

    # Wrap with PrefectAgent for durable execution with custom retry policy
    return PrefectAgent(
        agent,
        model_task_config=TaskConfig(
            retries=3,  # Retry LLM calls up to 3 times
            retry_delay_seconds=[1.0, 2.0, 4.0],  # Exponential backoff
            timeout_seconds=60.0,  # 60s timeout for LLM calls
        ),
        tool_task_config=TaskConfig(
            retries=2,  # Retry tool calls up to 2 times
            retry_delay_seconds=[0.5, 1.0],
        ),
    )


Sample Dataset Generator

Create a realistic sales dataset for demonstration.
@task
def create_sample_dataset() -> pd.DataFrame:
    """Generate a sample sales dataset with some anomalies.

    In production, you'd load real data from a file, database, or API."""
    data = {
        "product": ["Widget", "Gadget", "Doohickey", "Widget", "Gadget"] * 20,
        "sales": [100, 150, 200, 110, 145] * 19
        + [100, 150, 200, 1000, 2000],  # Last 2 are anomalies
        "region": ["North", "South", "East", "West", "Central"] * 20,
        "month": [1, 2, 3, 4, 5] * 20,
    }
    return pd.DataFrame(data)


Main Analysis Flow

Orchestrate the entire AI analysis workflow with Prefect.
@flow(name="ai-data-analyst", log_prints=True)
async def analyze_dataset_with_ai() -> DataAnalysis:
    """Run AI-powered data analysis with automatic retries and observability.

    This flow demonstrates how Prefect makes AI workflows production-ready:
    1. Dataset preparation is tracked as a task
    2. AI agent execution is wrapped for durability
    3. All LLM and tool calls are logged and retryable
    4. Results are structured and validated with Pydantic
    """

    # Prepare the dataset
    print("📊 Preparing dataset...")
    df = create_sample_dataset()
    print(f"Dataset shape: {df.shape}\n")

    # Create the AI agent with Prefect durability
    print("🤖 Initializing AI data analyst...")
    agent = create_data_analyst_agent()

    # Run the analysis - all LLM and tool calls are automatically retried on failure
    print("🔍 Running AI analysis...\n")
    result = await agent.run(
        "Analyze this sales dataset. Identify patterns, anomalies, and provide recommendations.",
        deps=df,
    )

    # Display results
    print(result.output)

    return result.output


Serve the Flow

To get full durable execution with automatic idempotency, serve the flow to create a deployment. Deployed flows enable Prefect’s transactional semantics for agent operations.
if __name__ == "__main__":
    import os
    import sys

    # Check if OpenAI API key is set
    if not os.getenv("OPENAI_API_KEY"):
        print("❌ Error: OPENAI_API_KEY environment variable not set")
        print("Set it with: export OPENAI_API_KEY='your-key-here'")
        sys.exit(1)

    # Serve the flow - this creates a deployment and runs a worker process
    analyze_dataset_with_ai.serve(
        name="ai-data-analyst-deployment",
        tags=["ai", "pydantic-ai", "data-analysis"],
    )

Triggering Flow Runs

Once served, trigger runs via: Prefect UI:
  1. Navigate to http://localhost:4200
  2. Go to Deployments → “ai-data-analyst-deployment”
  3. Click “Run” → “Quick Run”
CLI:
prefect deployment run ai-data-analyst/ai-data-analyst-deployment --watch

Local Testing

For quick local testing without deployment:
import asyncio
asyncio.run(analyze_dataset_with_ai())

What Just Happened?

When you serve and trigger this flow, Prefect and pydantic-ai work together to create a resilient AI pipeline:
  1. Deployment Creationserve() creates a deployment and starts a worker to execute flow runs
  2. Durable AI Execution – The PrefectAgent wrapper makes all AI operations retryable:
    • LLM calls retry up to 3 times with exponential backoff (1s, 2s, 4s)
    • Tool calls retry up to 2 times
    • All operations respect 60s timeout
  3. Tool Observability – Each time the AI calls a tool (get_column_info, calculate_statistics, detect_anomalies), the call is run as a Prefect task
  4. Structured Results – Pydantic validates the AI’s output, ensuring it matches the expected schema
  5. Automatic Idempotency – When a deployed flow run is retried, Prefect’s transactional semantics ensure that completed tasks are skipped and only failed operations are re-executed. This prevents duplicate API calls and wasted compute.

Key Takeaways

  • Deploy for Durability – Use flow.serve() or flow.deploy() to unlock automatic idempotency and transactional semantics
  • Retry Intelligence – Failed flow runs can be retried from the UI, skipping already-completed tasks
  • Tool Observability – Every AI decision and tool call is tracked, logged, and independently retryable
  • Zero Boilerplate – Just wrap your pydantic-ai agent with PrefectAgent
  • Customizable Policies – Fine-tune retries, timeouts, and error handling per operation type
Try it yourself:
  1. Set your OpenAI API key: export OPENAI_API_KEY='your-key'
  2. Start the Prefect server: prefect server start
  3. Serve the flow: uv run -s examples/ai_data_analyst_with_pydantic_ai.py
  4. Trigger a run from the UI (http://localhost:4200) or CLI
  5. Watch all AI operations tracked in real-time
For more on AI orchestration with Prefect:
I