KEMBAR78
Pydantic Ai Implementation Guide | PDF | Information Retrieval | Sql
0% found this document useful (0 votes)
1K views26 pages

Pydantic Ai Implementation Guide

Uploaded by

lionelnumtema
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
1K views26 pages

Pydantic Ai Implementation Guide

Uploaded by

lionelnumtema
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 26

Pydantic AI Implementation Guide

Table of Contents
1. Introduction
2. Quick Start Guide
3. Setting Up Your Environment
4. Defining Agents
5. Creating and Registering Tools
6. Dependency Injection
7. MCP Server Setup
8. Agent Types and Use Cases
9. End-to-End Example
10. Testing and Debugging
11. Deployment Considerations
12. Appendix: Advanced Patterns

Introduction
This implementation guide provides practical instructions and code examples for
building an AI agent platform using Pydantic AI. It covers everything from basic agent
setup to complex workflows and deployment strategies. The guide is designed to help
developers implement the architecture described in the platform architecture
document while following the best practices outlined in the best practices guide.

Quick Start Guide


Here’s a checklist to get your Pydantic AI agent platform up and running quickly:

1. Install Pydantic AI and dependencies


2. Define your agent types and their capabilities
3. Create tools and register them with your agents
4. Set up dependency injection for external services
5. Configure the MCP server for agent communication
6. Implement monitoring and observability
7. Deploy your platform

Let’s start with a simple example:

# Install Pydantic AI
# pip install pydantic-ai

from pydantic_ai import Agent


from pydantic import BaseModel, Field

# Define a simple agent


agent = Agent(
'openai:gpt-4',
system_prompt="You are a helpful assistant that provides concise answers."
)

# Run the agent


response = agent.run_sync("What is Pydantic AI?")
print(response.output)

Setting Up Your Environment


Installation
# Create a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate

# Install Pydantic AI and dependencies


pip install pydantic-ai
pip install fastapi uvicorn # For MCP server
pip install pytest pytest-asyncio # For testing

Project Structure

A typical Pydantic AI platform project might be structured as follows:

my_ai_platform/
├── agents/
│ ├── __init__.py
│ ├── base.py
│ ├── retrieval.py
│ ├── planning.py
│ └── conversation.py
├── tools/
│ ├── __init__.py
│ ├── registry.py
│ ├── search.py
│ ├── database.py
│ └── external_apis.py
├── services/
│ ├── __init__.py
│ ├── database.py
│ ├── vector_store.py
│ └── authentication.py
├── models/
│ ├── __init__.py
│ ├── input_types.py
│ └── output_types.py
├── mcp/
│ ├── __init__.py
│ ├── server.py
│ └── client.py
├── config/
│ ├── __init__.py
│ ├── settings.py
│ └── logging.py
├── tests/
│ ├── __init__.py
│ ├── test_agents.py
│ └── test_tools.py
├── main.py
├── Dockerfile
└── docker-compose.yml

Defining Agents
Agents are the core building blocks of your AI platform. Here’s how to define different
types of agents:

Basic Agent
from pydantic_ai import Agent

# Simple agent with a static system prompt


basic_agent = Agent(
'openai:gpt-4',
system_prompt="You are a helpful assistant that provides concise answers."
)

Agent with Structured Output


from pydantic_ai import Agent
from pydantic import BaseModel, Field
from enum import Enum

class Sentiment(Enum):
POSITIVE = "positive"
NEUTRAL = "neutral"
NEGATIVE = "negative"

class SentimentAnalysis(BaseModel):
sentiment: Sentiment
confidence: float = Field(ge=0.0, le=1.0)
explanation: str

# Agent that returns structured output


sentiment_agent = Agent(
'anthropic:claude-3-opus-20240229',
system_prompt="Analyze the sentiment of the given text.",
output_type=SentimentAnalysis
)

result = sentiment_agent.run_sync("I absolutely love this product! It's amazing!")


print(f"Sentiment: {result.data.sentiment}")
print(f"Confidence: {result.data.confidence}")
print(f"Explanation: {result.data.explanation}")

Agent with Dynamic System Prompt


from pydantic_ai import Agent, RunContext
from dataclasses import dataclass

@dataclass
class UserContext:
user_name: str
user_preferences: dict
language: str = "en"

# Agent with dynamic system prompt based on user context


personalized_agent = Agent(
'google-gla:gemini-1.5-pro',
deps_type=UserContext
)

@personalized_agent.system_prompt
def personalized_prompt(ctx: RunContext[UserContext]) -> str:
return f"""
You are a personal assistant for {ctx.deps.user_name}.
Preferred language: {ctx.deps.language}
User preferences: {ctx.deps.user_preferences}

Always tailor your responses to match these preferences.


"""

# Using the agent with specific user context


user_context = UserContext(
user_name="Alice",
user_preferences={"tone": "casual", "detail_level": "high"},
language="en"
)

response = personalized_agent.run_sync(
"What restaurants would you recommend?",
deps=user_context
)

Async Agent
from pydantic_ai import Agent
import asyncio

async_agent = Agent(
'openai:gpt-4',
system_prompt="You are a helpful assistant."
)

async def process_query(query: str):


result = await async_agent.run(query)
return result.output

# Run multiple queries concurrently


async def process_multiple_queries(queries: list[str]):
tasks = [process_query(query) for query in queries]
results = await asyncio.gather(*tasks)
return results

# Example usage
queries = ["What is AI?", "Explain quantum computing", "How does blockchain work?"]
results = asyncio.run(process_multiple_queries(queries))

Stateful Agent
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class ConversationState:
history: List[dict]
user_id: str
last_topic: Optional[str] = None

# Stateful agent that maintains conversation history


stateful_agent = Agent(
'anthropic:claude-3-sonnet-20240229',
deps_type=ConversationState
)

@stateful_agent.system_prompt
def conversation_prompt(ctx: RunContext[ConversationState]) -> str:
history_str = "\n".join([
f"User: {msg['user']}\nAssistant: {msg['assistant']}"
for msg in ctx.deps.history[-5:] # Last 5 messages
])

return f"""
You are a conversational assistant for user {ctx.deps.user_id}.

Recent conversation history:


{history_str}

Last topic discussed: {ctx.deps.last_topic or 'None'}

Maintain context from the conversation history.


"""

# Example usage
state = ConversationState(
history=[
{"user": "Hello", "assistant": "Hi there! How can I help you today?"},
{"user": "Tell me about machine learning", "assistant": "Machine learning is..."}
],
user_id="user123",
last_topic="machine learning"
)

response = stateful_agent.run_sync("How is it different from deep learning?", deps=state)

# Update state with new interaction


state.history.append({
"user": "How is it different from deep learning?",
"assistant": response.output
})
state.last_topic = "deep learning vs machine learning"

Creating and Registering Tools


Tools allow agents to interact with external systems and perform specific functions.
Here’s how to create and register tools:

Basic Tool Registration


from pydantic_ai import Agent

weather_agent = Agent(
'openai:gpt-4',
system_prompt="You provide weather information."
)

@weather_agent.tool_plain
def get_current_temperature(city: str) -> float:
"""
Get the current temperature in Celsius for the specified city.

Args:
city: The name of the city to get the temperature for

Returns:
The current temperature in Celsius
"""
# In a real implementation, this would call a weather API
# This is a mock implementation for demonstration
mock_temperatures = {
"new york": 22.5,
"london": 18.0,
"tokyo": 26.3,
"sydney": 20.1
}
return mock_temperatures.get(city.lower(), 20.0) # Default to 20°C if city not found

Tool with Context


from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
import aiohttp

@dataclass
class APICredentials:
api_key: str
base_url: str

search_agent = Agent(
'anthropic:claude-3-haiku-20240307',
deps_type=APICredentials,
system_prompt="You help users find information online."
)

@search_agent.tool
async def search_web(ctx: RunContext[APICredentials], query: str, max_results: int = 5) ->
list[dict]:
"""
Search the web for information related to the query.

Args:
query: The search query
max_results: Maximum number of results to return (default: 5)

Returns:
A list of search results, each containing a title, URL, and snippet
"""
headers = {"Authorization": f"Bearer {ctx.deps.api_key}"}
params = {"q": query, "limit": max_results}

async with aiohttp.ClientSession() as session:


async with session.get(f"{ctx.deps.base_url}/search", headers=headers,
params=params) as response:
if response.status == 200:
data = await response.json()
return data.get("results", [])
else:
return []

Tool Registry

For larger applications, it’s helpful to organize tools in a central registry:

# tools/registry.py
from typing import Dict, Callable, Any, List
from pydantic_ai import Agent

class ToolRegistry:
def __init__(self):
self._tools: Dict[str, Dict[str, Callable]] = {}

def register_tool(self, category: str, name: str, tool_func: Callable) -> None:
"""Register a tool function under a specific category."""
if category not in self._tools:
self._tools[category] = {}

self._tools[category][name] = tool_func

def get_tool(self, category: str, name: str) -> Callable:


"""Get a specific tool by category and name."""
return self._tools.get(category, {}).get(name)

def get_category_tools(self, category: str) -> Dict[str, Callable]:


"""Get all tools in a category."""
return self._tools.get(category, {})

def get_all_tools(self) -> Dict[str, Dict[str, Callable]]:


"""Get all registered tools."""
return self._tools

def register_with_agent(self, agent: Agent, category: str = None) -> None:


"""Register all tools (or tools from a specific category) with an agent."""
if category:
tools = self.get_category_tools(category)
for name, tool_func in tools.items():
agent.add_tool(tool_func)
else:
for category in self._tools:
for name, tool_func in self._tools[category].items():
agent.add_tool(tool_func)

# Example usage
registry = ToolRegistry()

# Register tools
def search_wikipedia(query: str) -> List[dict]:
"""Search Wikipedia for information."""
# Implementation
return []

def search_news(query: str, days: int = 7) -> List[dict]:


"""Search recent news articles."""
# Implementation
return []

registry.register_tool("search", "wikipedia", search_wikipedia)


registry.register_tool("search", "news", search_news)

# Create agent and register tools


agent = Agent('openai:gpt-4', system_prompt="You help find information.")
registry.register_with_agent(agent, category="search")

Dependency Injection
Dependency injection allows you to provide external services and data to your agents
and tools:

Basic Dependency Injection


from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class DatabaseService:
connection_string: str

async def query(self, sql: str) -> List[Dict[str, Any]]:


# In a real implementation, this would connect to a database
# This is a mock implementation for demonstration
if "users" in sql.lower():
return [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"}
]
return []

@dataclass
class AppDependencies:
db: DatabaseService
user_id: int
api_key: str

# Create agent with dependencies


data_agent = Agent(
'openai:gpt-4',
deps_type=AppDependencies,
system_prompt="You help users query data from the database."
)

@data_agent.tool
async def query_database(ctx: RunContext[AppDependencies], sql: str) -> List[Dict[str,
Any]]:
"""
Execute an SQL query against the database.

Args:
sql: The SQL query to execute

Returns:
The query results as a list of dictionaries
"""
# Add security check to prevent SQL injection
if "DROP" in sql.upper() or "DELETE" in sql.upper():
raise ValueError("Destructive SQL operations are not allowed")

# Log the query with user ID for audit purposes


print(f"User {ctx.deps.user_id} executed query: {sql}")

# Execute the query


return await ctx.deps.db.query(sql)

# Example usage
db_service = DatabaseService(connection_string="postgresql://user:pass@localhost/db")
dependencies = AppDependencies(
db=db_service,
user_id=123,
api_key="sk-api-key"
)

result = data_agent.run_sync(
"Find all users in the database",
deps=dependencies
)

Service Layer

For more complex applications, you might want to create a service layer:

# services/database.py
import asyncpg
from typing import List, Dict, Any, Optional

class DatabaseService:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None

async def initialize(self):


"""Initialize the connection pool."""
self.pool = await asyncpg.create_pool(self.connection_string)

async def close(self):


"""Close the connection pool."""
if self.pool:
await self.pool.close()

async def query(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str,
Any]]:
"""Execute a query and return results as dictionaries."""
if not self.pool:
await self.initialize()

async with self.pool.acquire() as conn:


stmt = await conn.prepare(sql)
records = await stmt.fetch(*(params or []))
return [dict(record) for record in records]

async def execute(self, sql: str, params: Optional[List[Any]] = None) -> str:
"""Execute a command and return status."""
if not self.pool:
await self.initialize()

async with self.pool.acquire() as conn:


status = await conn.execute(sql, *(params or []))
return status

# services/vector_store.py
import numpy as np
from typing import List, Dict, Any, Optional

class VectorStoreService:
def __init__(self, connection_string: str):
self.connection_string = connection_string
# Initialize your vector database client here

async def search(self, query_vector: List[float], top_k: int = 5) -> List[Dict[str,


Any]]:
"""Search for similar vectors."""
# In a real implementation, this would query a vector database
# This is a mock implementation for demonstration
return [
{"id": 1, "content": "Sample document 1", "score": 0.92},
{"id": 2, "content": "Sample document 2", "score": 0.85}
]

async def insert(self, id: str, vector: List[float], metadata: Dict[str, Any]) ->
bool:
"""Insert a vector with metadata."""
# Implementation
return True

# services/service_container.py
from dataclasses import dataclass
from .database import DatabaseService
from .vector_store import VectorStoreService

@dataclass
class ServiceContainer:
db: DatabaseService
vector_store: VectorStoreService

@classmethod
def create_from_config(cls, config: dict):
"""Create a service container from configuration."""
db = DatabaseService(config["database_url"])
vector_store = VectorStoreService(config["vector_store_url"])
return cls(db=db, vector_store=vector_store)

async def initialize(self):


"""Initialize all services."""
await self.db.initialize()
# Initialize other services as needed

async def close(self):


"""Close all services."""
await self.db.close()
# Close other services as needed

MCP Server Setup


The Model Context Protocol (MCP) server provides a standardized way for agents to
communicate with external services:

Basic MCP Server


# mcp/server.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
import uvicorn

app = FastAPI(title="Pydantic AI MCP Server")

# Security
API_KEY = "your-secret-api-key" # In production, use environment variables
api_key_header = APIKeyHeader(name="X-API-Key")

def verify_api_key(api_key: str = Depends(api_key_header)):


if api_key != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key")
return api_key

# Models
class MCPRequest(BaseModel):
agent_id: str
function_name: str
parameters: Dict[str, Any]
context: Optional[Dict[str, Any]] = None

class MCPResponse(BaseModel):
status: str
data: Any
error: Optional[str] = None

# Routes
@app.post("/invoke", response_model=MCPResponse, dependencies=[Depends(verify_api_key)])
async def invoke_function(request: MCPRequest):
try:
# In a real implementation, this would dispatch to the appropriate function
# This is a mock implementation for demonstration
if request.function_name == "get_weather":
# Mock weather data
return MCPResponse(
status="success",
data={"temperature": 22.5, "condition": "sunny"}
)
elif request.function_name == "search_database":
# Mock database search
return MCPResponse(
status="success",
data=[{"id": 1, "name": "Example result"}]
)
else:
return MCPResponse(
status="error",
data=None,
error=f"Unknown function: {request.function_name}"
)
except Exception as e:
return MCPResponse(
status="error",
data=None,
error=str(e)
)

@app.get("/health")
async def health_check():
return {"status": "healthy"}

# Run the server


if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Docker Compose Setup
# docker-compose.yml
version: '3'

services:
mcp-server:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- API_KEY=${API_KEY}
- DATABASE_URL=${DATABASE_URL}
- VECTOR_STORE_URL=${VECTOR_STORE_URL}
volumes:
- ./logs:/app/logs
restart: unless-stopped
depends_on:
- postgres
- redis

postgres:
image: postgres:14
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- "5432:5432"

redis:
image: redis:7
ports:
- "6379:6379"
volumes:
- redis-data:/data

volumes:
postgres-data:
redis-data:

# Dockerfile
FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "mcp/server.py"]

Agent Types and Use Cases


Here are examples of different agent types and their implementations:

Retrieval Agent
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class VectorDBDeps:
collection_name: str
api_key: str

retrieval_agent = Agent(
'openai:gpt-4',
deps_type=VectorDBDeps,
system_prompt="""
You are a retrieval agent that helps users find relevant information.
Use the search_documents tool to find information related to the user's query.
Always cite your sources by including the document ID.
"""
)

@retrieval_agent.tool
async def search_documents(ctx: RunContext[VectorDBDeps], query: str, top_k: int = 3) ->
List[Dict[str, Any]]:
"""
Search for documents related to the query.

Args:
query: The search query
top_k: Maximum number of results to return (default: 3)

Returns:
A list of documents, each containing an ID, content, and relevance score
"""
# In a real implementation, this would query a vector database
# This is a mock implementation for demonstration
return [
{"id": "doc1", "content": "Information about topic A", "score": 0.92},
{"id": "doc2", "content": "Information about topic B", "score": 0.85},
{"id": "doc3", "content": "Information about topic C", "score": 0.78}
]

# Example usage
deps = VectorDBDeps(collection_name="knowledge_base", api_key="api-key")
result = retrieval_agent.run_sync(
"What information do we have about topic A?",
deps=deps
)

Planning Agent
from pydantic_ai import Agent
from pydantic import BaseModel, Field
from typing import List
from enum import Enum

class TaskStatus(Enum):
TODO = "todo"
IN_PROGRESS = "in_progress"
DONE = "done"

class Task(BaseModel):
description: str
status: TaskStatus = TaskStatus.TODO
dependencies: List[int] = Field(default_factory=list)

class Plan(BaseModel):
goal: str
tasks: List[Task]
estimated_completion_time: str
planning_agent = Agent(
'anthropic:claude-3-opus-20240229',
output_type=Plan,
system_prompt="""
You are a planning agent that helps users break down complex goals into actionable
tasks.
For each goal, create a detailed plan with:
1. A clear list of tasks
2. Dependencies between tasks (which tasks must be completed before others)
3. An estimated completion time

Be realistic and thorough in your planning.


"""
)

# Example usage
result = planning_agent.run_sync(
"I want to build a personal website to showcase my portfolio."
)

print(f"Goal: {result.data.goal}")
print(f"Estimated completion time: {result.data.estimated_completion_time}")
print("Tasks:")
for i, task in enumerate(result.data.tasks):
deps = f"Dependencies: {task.dependencies}" if task.dependencies else "No
dependencies"
print(f"{i+1}. {task.description} ({task.status.value}) - {deps}")

Conversational Agent
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Optional

@dataclass
class ConversationHistory:
messages: List[Dict[str, str]]
user_name: str
preferences: Dict[str, str]

conversation_agent = Agent(
'openai:gpt-4',
deps_type=ConversationHistory,
system_prompt="""
You are a conversational agent that maintains context throughout a conversation.
Be friendly, helpful, and personalized to the user's preferences.
"""
)

@conversation_agent.system_prompt
def dynamic_prompt(ctx: RunContext[ConversationHistory]) -> str:
history = "\n".join([
f"User: {msg['user']}\nAssistant: {msg['assistant']}"
for msg in ctx.deps.messages[-5:] # Last 5 messages
])

return f"""
You are chatting with {ctx.deps.user_name}.

User preferences:
{ctx.deps.preferences}

Recent conversation history:


{history}

Maintain context from the conversation history and respond in a way that aligns with
Maintain context from the conversation history and respond in a way that aligns with
the user's preferences.
"""

# Example usage
history = ConversationHistory(
messages=[
{"user": "Hi there!", "assistant": "Hello! How can I help you today?"},
{"user": "I'm looking for a good book to read.", "assistant": "What genres do you
enjoy?"},
{"user": "I like science fiction and fantasy.", "assistant": "Great choices! Some
recommendations..."}
],
user_name="Alex",
preferences={"tone": "casual", "response_length": "detailed"}
)

response = conversation_agent.run_sync(
"Any recommendations for books like Dune?",
deps=history
)

# Update conversation history


history.messages.append({
"user": "Any recommendations for books like Dune?",
"assistant": response.output
})

RAG (Retrieval-Augmented Generation) Agent


from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class RAGDependencies:
vector_db_url: str
api_key: str
user_id: str

rag_agent = Agent(
'openai:gpt-4',
deps_type=RAGDependencies,
system_prompt="""
You are a RAG (Retrieval-Augmented Generation) agent.

Process:
1. When a user asks a question, use the search_knowledge_base tool to retrieve
relevant information
2. Synthesize the retrieved information to provide a comprehensive answer
3. Always cite your sources by including document IDs
4. If the retrieved information is insufficient, acknowledge the limitations

Your goal is to provide accurate, well-sourced answers based on the retrieved


information.
"""
)

@rag_agent.tool
async def search_knowledge_base(ctx: RunContext[RAGDependencies], query: str, top_k: int =
5) -> List[Dict[str, Any]]:
"""
Search the knowledge base for information related to the query.

Args:
query: The search query
top_k: Maximum number of results to return (default: 5)
Returns:
A list of documents, each containing an ID, content, and relevance score
"""
# In a real implementation, this would query a vector database
# This is a mock implementation for demonstration
return [
{"id": "doc1", "content": "Detailed information about topic X...", "score": 0.95},
{"id": "doc2", "content": "Additional context about topic X...", "score": 0.87},
{"id": "doc3", "content": "Related information about topic Y...", "score": 0.82},
{"id": "doc4", "content": "Historical background on topic X...", "score": 0.78},
{"id": "doc5", "content": "Recent developments in topic X...", "score": 0.75}
]

# Example usage
deps = RAGDependencies(
vector_db_url="https://api.vectordb.example.com",
api_key="api-key",
user_id="user123"
)

result = rag_agent.run_sync(
"What is the latest information about topic X?",
deps=deps
)

Workflow Orchestration Agent


from pydantic_ai import Agent, RunContext
from pydantic import BaseModel, Field
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum

class WorkflowStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"

class WorkflowStep(BaseModel):
id: str
name: str
status: WorkflowStatus = WorkflowStatus.PENDING
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None

class Workflow(BaseModel):
id: str
name: str
steps: List[WorkflowStep]
current_step_index: int = 0
status: WorkflowStatus = WorkflowStatus.PENDING
context: Dict[str, Any] = Field(default_factory=dict)

@dataclass
class WorkflowDependencies:
workflow: Workflow
services: Dict[str, Any]

workflow_agent = Agent(
'openai:gpt-4',
deps_type=WorkflowDependencies,
system_prompt="""
You are a workflow orchestration agent responsible for managing multi-step processes.

Your responsibilities:
1. Execute the current step in the workflow
2. Update the workflow state based on the results
3. Determine the next step to execute
4. Handle errors and provide recovery options

Follow the workflow definition precisely and maintain the workflow context.
"""
)

@workflow_agent.tool
async def execute_step(ctx: RunContext[WorkflowDependencies]) -> Dict[str, Any]:
"""
Execute the current step in the workflow.

Returns:
The result of the step execution
"""
workflow = ctx.deps.workflow
current_step = workflow.steps[workflow.current_step_index]

try:
# Update step status
current_step.status = WorkflowStatus.IN_PROGRESS

# In a real implementation, this would dispatch to the appropriate service


# This is a mock implementation for demonstration
if current_step.id == "data_extraction":
result = {"extracted_data": {"key1": "value1", "key2": "value2"}}
elif current_step.id == "data_transformation":
# Use data from previous step
input_data = workflow.steps[workflow.current_step_index -
1].result["extracted_data"]
result = {"transformed_data": {"processed_key1": input_data["key1"].upper()}}
elif current_step.id == "data_loading":
result = {"loaded_records": 42}
else:
raise ValueError(f"Unknown step: {current_step.id}")

# Update step with result


current_step.result = result
current_step.status = WorkflowStatus.COMPLETED

# Update workflow context with step result


workflow.context[current_step.id] = result

return {"success": True, "step_id": current_step.id, "result": result}

except Exception as e:
current_step.status = WorkflowStatus.FAILED
current_step.error = str(e)
return {"success": False, "step_id": current_step.id, "error": str(e)}

@workflow_agent.tool
async def advance_workflow(ctx: RunContext[WorkflowDependencies]) -> Dict[str, Any]:
"""
Advance the workflow to the next step.

Returns:
Information about the next step
"""
workflow = ctx.deps.workflow

# Check if we're at the end of the workflow


if workflow.current_step_index >= len(workflow.steps) - 1:
workflow.status = WorkflowStatus.COMPLETED
return {"success": True, "workflow_completed": True}

# Advance to next step


workflow.current_step_index += 1
next_step = workflow.steps[workflow.current_step_index]

return {
"success": True,
"next_step_id": next_step.id,
"next_step_name": next_step.name
}

# Example usage
workflow = Workflow(
id="data_pipeline_1",
name="Data Processing Pipeline",
steps=[
WorkflowStep(id="data_extraction", name="Extract Data from Source"),
WorkflowStep(id="data_transformation", name="Transform Data"),
WorkflowStep(id="data_loading", name="Load Data to Destination")
]
)

deps = WorkflowDependencies(
workflow=workflow,
services={} # In a real implementation, this would contain service instances
)

# Run the workflow orchestration


async def run_workflow():
while workflow.status != WorkflowStatus.COMPLETED and workflow.status !=
WorkflowStatus.FAILED:
# Execute current step
result = await workflow_agent.run(
f"Execute step: {workflow.steps[workflow.current_step_index].name}",
deps=deps
)

# Check if we need to advance the workflow


if workflow.steps[workflow.current_step_index].status == WorkflowStatus.COMPLETED:
advance_result = await workflow_agent.run(
"Advance to next step",
deps=deps
)

if "workflow_completed" in advance_result.output:
print("Workflow completed successfully!")
break

return workflow

End-to-End Example
Here’s a complete end-to-end example of a simple customer support agent platform:

# main.py
import asyncio
import uvicorn
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from enum import Enum
from dataclasses import dataclass

from pydantic_ai import Agent, RunContext

# Models
class SupportCategory(Enum):
BILLING = "billing"
TECHNICAL = "technical"
ACCOUNT = "account"
OTHER = "other"

class SupportRequest(BaseModel):
customer_id: str
message: str

class SupportResponse(BaseModel):
response: str
category: SupportCategory
escalate: bool = False
follow_up_actions: List[str] = Field(default_factory=list)

# Services
@dataclass
class CustomerDatabase:
async def get_customer(self, customer_id: str) -> Dict[str, Any]:
# Mock implementation
return {
"id": customer_id,
"name": "John Doe",
"email": "john@example.com",
"account_type": "premium",
"billing_status": "active"
}

async def get_customer_tickets(self, customer_id: str) -> List[Dict[str, Any]]:


# Mock implementation
return [
{"id": "ticket1", "status": "resolved", "category": "billing", "created_at":
"2023-01-15"}
]

@dataclass
class KnowledgeBase:
async def search(self, query: str, top_k: int = 3) -> List[Dict[str, Any]]:
# Mock implementation
return [
{"id": "kb1", "title": "Billing FAQ", "content": "Information about
billing..."},
{"id": "kb2", "title": "Technical Support Guide", "content": "Troubleshooting
steps..."}
]

@dataclass
class SupportDependencies:
customer_db: CustomerDatabase
knowledge_base: KnowledgeBase
customer_id: str

# Agent
support_agent = Agent(
'openai:gpt-4',
deps_type=SupportDependencies,
output_type=SupportResponse,
system_prompt="""
You are a customer support agent for our company.

Your responsibilities:
1. Respond to customer inquiries in a helpful and professional manner
2. Categorize the inquiry appropriately
3. Determine if the issue needs to be escalated to a human agent
4. Suggest follow-up actions if applicable

Use the available tools to gather information about the customer and relevant
knowledge base articles.
"""
)
@support_agent.tool
async def get_customer_info(ctx: RunContext[SupportDependencies]) -> Dict[str, Any]:
"""
Get information about the customer.

Returns:
Customer information including name, email, account type, and billing status
"""
return await ctx.deps.customer_db.get_customer(ctx.deps.customer_id)

@support_agent.tool
async def get_customer_history(ctx: RunContext[SupportDependencies]) -> List[Dict[str,
Any]]:
"""
Get the customer's support ticket history.

Returns:
List of previous support tickets
"""
return await ctx.deps.customer_db.get_customer_tickets(ctx.deps.customer_id)

@support_agent.tool
async def search_knowledge_base(ctx: RunContext[SupportDependencies], query: str) ->
List[Dict[str, Any]]:
"""
Search the knowledge base for relevant articles.

Args:
query: The search query

Returns:
List of relevant knowledge base articles
"""
return await ctx.deps.knowledge_base.search(query)

# FastAPI app
app = FastAPI(title="Customer Support Agent API")

@app.post("/support", response_model=SupportResponse)
async def handle_support_request(
request: SupportRequest,
authorization: Optional[str] = Header(None)
):
# In a real implementation, validate the authorization token
if not authorization:
raise HTTPException(status_code=401, detail="Unauthorized")

# Initialize services
customer_db = CustomerDatabase()
knowledge_base = KnowledgeBase()

# Create dependencies
deps = SupportDependencies(
customer_db=customer_db,
knowledge_base=knowledge_base,
customer_id=request.customer_id
)

# Run the agent


result = await support_agent.run(request.message, deps=deps)

return result.data

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

Testing and Debugging


Here are some examples of how to test and debug your Pydantic AI agents:

Unit Testing Agents


# tests/test_agents.py
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
from dataclasses import dataclass

from pydantic_ai import Agent, RunContext


from pydantic import BaseModel

# Models for testing


class TestOutput(BaseModel):
result: str
score: float

@dataclass
class TestDeps:
value: str

# Test agent
@pytest.fixture
def test_agent():
agent = Agent(
'openai:gpt-4',
deps_type=TestDeps,
output_type=TestOutput,
system_prompt="You are a test agent."
)

@agent.tool
async def test_tool(ctx: RunContext[TestDeps], input_value: str) -> str:
return f"Processed: {input_value} with {ctx.deps.value}"

return agent

# Tests
@pytest.mark.asyncio
async def test_agent_run(test_agent):
# Mock the LLM call to avoid actual API calls during testing
with patch('pydantic_ai.agent.Agent._call_llm', new_callable=AsyncMock) as mock_call:
# Set up the mock to return a valid response
mock_call.return_value = {
"output": "This is a test response",
"data": {"result": "Success", "score": 0.95}
}

# Run the agent


result = await test_agent.run("Test input", deps=TestDeps(value="test_value"))

# Assertions
assert result.output == "This is a test response"
assert result.data.result == "Success"
assert result.data.score == 0.95

# Verify the LLM was called with the right parameters


mock_call.assert_called_once()
call_args = mock_call.call_args[0]
assert "Test input" in call_args[0] # Check user input
assert "You are a test agent" in call_args[1] # Check system prompt

@pytest.mark.asyncio
async def test_tool_execution(test_agent):
# Test that the tool works correctly
ctx = RunContext(deps=TestDeps(value="test_context"))
result = await test_agent.tools["test_tool"](ctx, "test_input")
assert result == "Processed: test_input with test_context"

Integration Testing
# tests/test_integration.py
import pytest
import asyncio
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock

from main import app, support_agent

# Test client
@pytest.fixture
def client():
return TestClient(app)

# Tests
def test_support_endpoint(client):
# Mock the agent run method to avoid actual LLM calls
with patch('pydantic_ai.agent.Agent.run', new_callable=AsyncMock) as mock_run:
# Set up the mock to return a valid response
mock_run.return_value.data = {
"response": "I can help with your billing issue.",
"category": "billing",
"escalate": False,
"follow_up_actions": ["Check billing status"]
}

# Make the request


response = client.post(
"/support",
json={"customer_id": "customer123", "message": "I have a billing question"},
headers={"Authorization": "Bearer test-token"}
)

# Assertions
assert response.status_code == 200
data = response.json()
assert data["response"] == "I can help with your billing issue."
assert data["category"] == "billing"
assert data["escalate"] is False
assert "Check billing status" in data["follow_up_actions"]

Debugging Tips

1. Enable Verbose Logging:

import logging
logging.basicConfig(level=logging.DEBUG)

# Create a logger for your application


logger = logging.getLogger("pydantic_ai_app")
logger.setLevel(logging.DEBUG)

# Add a file handler


file_handler = logging.FileHandler("app.log")
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

# Use in your agent


@agent.tool
async def debug_tool(ctx: RunContext[Dependencies], input_value: str) -> str:
logger.debug(f"Tool called with input: {input_value}")
result = process_input(input_value)
logger.debug(f"Tool result: {result}")
return result

2. Inspect Agent Execution Graph:

from pydantic_ai import Agent

agent = Agent('openai:gpt-4', system_prompt="Debug agent")

# Enable graph visualization


agent.debug_mode = True

# Run the agent


result = agent.run_sync("Test input")

# The execution graph will be saved to a file that you can visualize
print(f"Execution graph saved to: {agent.last_execution_graph_path}")

Deployment Considerations
Environment Variables
# config/settings.py
import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
# API keys
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
anthropic_api_key: str = os.getenv("ANTHROPIC_API_KEY", "")
google_api_key: str = os.getenv("GOOGLE_API_KEY", "")

# Database
database_url: str = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost/db")

# MCP Server
mcp_server_host: str = os.getenv("MCP_SERVER_HOST", "0.0.0.0")
mcp_server_port: int = int(os.getenv("MCP_SERVER_PORT", "8000"))
mcp_api_key: str = os.getenv("MCP_API_KEY", "default-key")

# Logging
log_level: str = os.getenv("LOG_LEVEL", "INFO")

class Config:
env_file = ".env"

# Create settings instance


settings = Settings()

Containerization
# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code


COPY . .
# Set environment variables
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

# Run the application


CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Kubernetes Deployment
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pydantic-ai-platform
labels:
app: pydantic-ai-platform
spec:
replicas: 3
selector:
matchLabels:
app: pydantic-ai-platform
template:
metadata:
labels:
app: pydantic-ai-platform
spec:
containers:
- name: pydantic-ai-platform
image: your-registry/pydantic-ai-platform:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai-api-key
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: database-credentials
key: url
resources:
limits:
cpu: "1"
memory: "1Gi"
requests:
cpu: "500m"
memory: "512Mi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: pydantic-ai-platform
spec:
selector:
app: pydantic-ai-platform
ports:
- port: 80
targetPort: 8000
type: ClusterIP

Appendix: Advanced Patterns


Chained Agents with Dependency Injection
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from typing import List, Dict, Any
from pydantic import BaseModel

# Models
class ResearchTopic(BaseModel):
title: str
keywords: List[str]
description: str

class ResearchOutline(BaseModel):
sections: List[Dict[str, Any]]
references: List[str]

class ResearchReport(BaseModel):
title: str
introduction: str
sections: List[Dict[str, str]]
conclusion: str
references: List[str]

# Dependencies
@dataclass
class ResearchDeps:
api_key: str
user_id: str

# Agents
topic_agent = Agent(
'openai:gpt-4',
deps_type=ResearchDeps,
output_type=ResearchTopic,
system_prompt="You help users define research topics."
)

outline_agent = Agent(
'anthropic:claude-3-opus-20240229',
deps_type=ResearchDeps,
output_type=ResearchOutline,
system_prompt="You create detailed outlines for research reports."
)

report_agent = Agent(
'openai:gpt-4',
deps_type=ResearchDeps,
output_type=ResearchReport,
system_prompt="You write comprehensive research reports based on outlines."
)

# Workflow function
async def generate_research_report(query: str, deps: ResearchDeps) -> ResearchReport:
# Step 1: Generate research topic
topic_result = await topic_agent.run(
f"Generate a research topic based on: {query}",
deps=deps
)
topic = topic_result.data

# Step 2: Create outline based on topic


outline_prompt = f"""
Create a detailed outline for a research report on:
Title: {topic.title}
Description: {topic.description}
Keywords: {', '.join(topic.keywords)}
"""

outline_result = await outline_agent.run(outline_prompt, deps=deps)


outline = outline_result.data

# Step 3: Write full report based on outline


report_prompt = f"""
Write a comprehensive research report based on this outline:
Title: {topic.title}

Sections:
{outline.sections}

References to include:
{outline.references}
"""

report_result = await report_agent.run(report_prompt, deps=deps)


return report_result.data

Streaming Responses
from pydantic_ai import Agent
import asyncio

streaming_agent = Agent(
'openai:gpt-4',
system_prompt="You are a storytelling agent that creates engaging narratives.",
streaming=True # Enable streaming
)

# Synchronous streaming
for chunk in streaming_agent.stream_sync("Tell me a short story about a robot learning to
paint."):
print(chunk, end="", flush=True)

# Asynchronous streaming
async def stream_story():
async for chunk in streaming_agent.stream("Tell me a short story about a robot
learning to paint."):
print(chunk, end="", flush=True)
await asyncio.sleep(0.01) # Small delay for demonstration

# Run the async function


asyncio.run(stream_story())

Custom Model Provider


from pydantic_ai.models.base import BaseModel, ModelResponse
from typing import Dict, Any, Optional, List
import aiohttp

class CustomLLMModel(BaseModel):
"""Custom LLM model implementation."""

def __init__(self, model_name: str, api_key: str, api_url: str):


super().__init__(model_name)
self.api_key = api_key
self.api_url = api_url

async def generate(


self,
messages: List[Dict[str, str]],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
stop: Optional[List[str]] = None,
**kwargs
) -> ModelResponse:
"""Generate a response from the custom LLM."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}

payload = {
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stop": stop,
**kwargs
}

async with aiohttp.ClientSession() as session:


async with session.post(self.api_url, headers=headers, json=payload) as
response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"API error: {response.status} - {error_text}")

data = await response.json()

return ModelResponse(
content=data["choices"][0]["message"]["content"],
model=self.model_name,
usage={
"prompt_tokens": data["usage"]["prompt_tokens"],
"completion_tokens": data["usage"]["completion_tokens"],
"total_tokens": data["usage"]["total_tokens"]
},
raw_response=data
)

# Using the custom model


custom_model = CustomLLMModel(
model_name="custom-llm",
api_key="your-api-key",
api_url="https://api.custom-llm-provider.com/v1/chat/completions"
)

agent = Agent(
custom_model,
system_prompt="You are a helpful assistant."
)

response = agent.run_sync("Hello, how are you?")


print(response.output)

This implementation guide provides a comprehensive set of examples and guidelines


for building an AI agent platform using Pydantic AI. By following these examples,
developers can create sophisticated agent-based applications that leverage the power
of large language models while maintaining type safety, modularity, and production-
grade quality.

You might also like