Sanaz
06/28/2025, 4:43 PMMarvin
06/28/2025, 4:43 PMMarvin
06/28/2025, 4:44 PMpython
from prefect import flow, task
from marvin import ai_fn, ai_model
from marvin.tools.search import Search
from marvin.tools.file_reader import FileReader
import json
# Configure specialized agents for different tasks
@ai_model
class WebResearchAgent:
"""Agent specialized in web research and information synthesis."""
llm_model = "gpt-4"
temperature = 0.3
@ai_fn
def search_and_synthesize(self, query: str) -> str:
"""Search the web and synthesize findings."""
pass
@ai_model
class FileProcessingAgent:
"""Agent specialized in processing and analyzing different file types."""
llm_model = "gpt-4"
temperature = 0.2
@ai_fn
def analyze_file(self, file_content: str, file_type: str) -> dict:
"""Analyze file content and extract key information."""
pass
@task
def perform_web_research(query: str) -> str:
"""Task to perform web research using the WebResearchAgent."""
agent = WebResearchAgent()
search_tool = Search()
# Perform search and get results
search_results = search_tool.search(query)
# Have the agent synthesize the results
synthesis = agent.search_and_synthesize(search_results)
return synthesis
@task
def process_file(file_path: str) -> dict:
"""Task to process different file types using the FileProcessingAgent."""
agent = FileProcessingAgent()
file_reader = FileReader()
# Read the file content
file_content = file_reader.read(file_path)
file_type = file_path.split('.')[-1]
# Have the agent analyze the content
analysis = agent.analyze_file(file_content, file_type)
return analysis
@flow
def research_and_analyze_flow(
search_query: str,
file_paths: list[str]
) -> dict:
"""
Main workflow that coordinates web research and file processing.
Args:
search_query: The query to research online
file_paths: List of paths to files that need to be processed
"""
# Perform web research
web_results = perform_web_research(search_query)
# Process all files
file_results = []
for file_path in file_paths:
result = process_file(file_path)
file_results.append(result)
# Combine results
combined_results = {
"web_research": web_results,
"file_analysis": file_results
}
return combined_results
# Example usage
if __name__ == "__main__":
query = "Latest developments in AI and data engineering"
files_to_process = [
"data/report.pdf",
"data/analysis.xlsx",
"data/notes.txt"
]
results = research_and_analyze_flow(query, files_to_process)
print(json.dumps(results, indent=2))
This example demonstrates several key concepts:
1. Specialized Agents: We create two distinct AI agents:
- `WebResearchAgent`: Focused on web research and synthesis
- `FileProcessingAgent`: Specialized in processing different file types
2. Task Separation: Each major operation is separated into its own task:
- `perform_web_research`: Handles web searching and result synthesis
- `process_file`: Manages file reading and analysis
3. Flow Orchestration: The main flow research_and_analyze_flow
coordinates the entire process, managing both web research and file processing tasks.
4. Tool Integration: The workflow uses Marvin's built-in tools:
- `Search`: For web searching capabilities
- `FileReader`: For handling different file types
To use this workflow, you'll need to:
1. Install the required dependencies:
```bash
pipSanaz
06/30/2025, 12:44 PMMarvin
06/30/2025, 12:44 PMMarvin
06/30/2025, 12:45 PMbash
# Core requirements
pip install "prefect>=3.0.0"
pip install "marvin>=1.0.0"
pip install "openai>=1.0.0" # Required for GPT model access
# For web search capabilities
pip install "duckduckgo-search>=3.0.0" # For web searching
pip install "beautifulsoup4>=4.9.0" # For parsing web content
# For file processing
pip install "python-magic>=0.4.27" # For file type detection
pip install "pypdf>=3.0.0" # For PDF processing
pip install "openpyxl>=3.0.0" # For Excel files
pip install "pandas>=1.0.0" # For structured data processing
Additionally, you'll need to:
1. Set up your OpenAI API key:
bash
export OPENAI_API_KEY="your-api-key-here"
Or in your Python code:
python
import os
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
2. Optional but recommended - create a virtual environment:
bash
# Create a virtual environment
python -m venv venv
# Activate it
# On Windows:
venv\Scripts\activate
# On Unix or MacOS:
source venv/bin/activate
Important Notes:
1. Version Compatibility:
- The versions specified are minimum versions. You can use newer versions, but be aware of potential breaking changes.
- If you encounter version conflicts, you might need to adjust the versions to find a compatible combination.
2. File Type Support:
- The dependencies listed above cover common file types (PDF, Excel, text files)
- If you need to process additional file types, you might need extra libraries:
- docx2txt
for Word documents
- pillow
for image processing
- python-pptx
for PowerPoint files
3. System Dependencies:
- python-magic
might require additional system libraries on Windows
- On Linux, you might need to install `libmagic`:
bash
# Ubuntu/Debian
sudo apt-get install libmagic1
# CentOS/RHEL
sudo yum install file-devel
4. Memory Requirements:
- When processing large files or running multiple agents simultaneously, ensure your system has adequate RAM
- GPT-4 models typically require more memory than GPT-3.5
To verify your installation, you can run this simple test:
python
import prefect
import marvin
import openai
import duckduckgo_search
import magic
import pypdf
import pandas
print("All core dependencies successfully imported!")
If you plan to deploy this workflow in a production environment, it's recommended to:
1. Create a requirements.txt
file:
bash
pip freeze > requirements.txt
2. Or use a pyproject.toml
or setup.py
for more structured dependency management.
Would you like me to provide more specific information about any of these dependencies or help with troubleshooting the installation process?Sanaz
06/30/2025, 12:57 PMMarvin
06/30/2025, 12:57 PMMarvin
06/30/2025, 12:58 PMpython
from marvin import ai_fn, ai_model
from prefect import flow, task
@ai_model
class ResearchAgent:
llm_model = "gpt-4"
@ai_fn
def research_topic(self, topic: str) -> str:
"""Research a topic and return key findings."""
pass
@ai_model
class OutlineAgent:
llm_model = "gpt-4"
@ai_fn
def create_outline(self, research: str) -> list[str]:
"""Create an article outline based on research."""
pass
@ai_model
class WriterAgent:
llm_model = "gpt-4"
@ai_fn
def write_section(self, outline: str, research: str) -> str:
"""Write a section of the article."""
pass
@flow
def article_creation_flow(topic: str):
# Research phase
researcher = ResearchAgent()
research = researcher.research_topic(topic)
# Outline phase
outliner = OutlineAgent()
outline = outliner.create_outline(research)
# Writing phase
writer = WriterAgent()
sections = []
for section in outline:
content = writer.write_section(section, research)
sections.append(content)
return "\n\n".join(sections)
2. Advanced Web Search Example
Here's a more sophisticated web research workflow:
python
from marvin import ai_fn, ai_model
from prefect import flow, task
from duckduckgo_search import ddg
from bs4 import BeautifulSoup
import requests
import asyncio
@ai_model
class WebSearchAgent:
llm_model = "gpt-4"
temperature = 0.3
@ai_fn
def generate_search_queries(self, topic: str) -> list[str]:
"""Generate multiple search queries to cover different aspects of the topic."""
pass
@ai_fn
def analyze_webpage(self, content: str, url: str) -> dict:
"""Analyze webpage content and extract relevant information."""
pass
@ai_fn
def synthesize_findings(self, results: list[dict]) -> str:
"""Synthesize findings from multiple sources into a coherent summary."""
pass
@task
async def fetch_webpage(url: str) -> str:
"""Fetch and parse webpage content."""
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
return soup.get_text()
@flow
async def advanced_research_flow(topic: str, max_sources: int = 5):
agent = WebSearchAgent()
# Generate multiple search queries
queries = agent.generate_search_queries(topic)
# Perform searches
all_results = []
for query in queries:
search_results = ddg(query, max_results=max_sources)
# Process each search result
for result in search_results:
content = await fetch_webpage(result['link'])
analysis = agent.analyze_webpage(content, result['link'])
all_results.append(analysis)
# Synthesize findings
final_summary = agent.synthesize_findings(all_results)
return final_summary
3. Multi-Format Document Processing Example
```python
from marvin import ai_fn, ai_model
from prefect import flow, task
import pypdf
import pandas as pd
from typing import Union, Dict
@ai_model
class DocumentAnalysisAgent:
llm_model = "gpt-4"
@ai_fn
def analyze_Sanaz
06/30/2025, 1:03 PMMarvin
06/30/2025, 1:03 PMMarvin
06/30/2025, 1:04 PM@ai_model
decorator in Marvin is a class decorator that transforms a regular Python class into an AI-powered class with the following key features:
1. Model Configuration
python
@ai_model
class ResearchAgent:
# Configure which LLM to use
llm_model = "gpt-4" # or "gpt-3.5-turbo", etc.
# Optional: Configure model parameters
temperature = 0.7 # Controls randomness (0.0 - 1.0)
max_tokens = 1000 # Maximum response length
2. State Management: The decorator allows the class to maintain state and context across multiple AI function calls.
3. AI Function Integration: When combined with @ai_fn
, it enables methods to be executed using the configured LLM:
python
@ai_model
class ContentAnalyzer:
llm_model = "gpt-4"
@ai_fn
def analyze_text(self, text: str) -> dict:
"""
The docstring becomes the instruction for the AI.
The method signature defines input/output types.
"""
pass # The actual implementation is handled by the LLM
4. Inheritance Support: AI models can inherit from other AI models:
python
@ai_model
class BaseAnalyzer:
llm_model = "gpt-4"
@ai_fn
def basic_analysis(self, text: str) -> str:
"""Perform basic text analysis."""
pass
@ai_model
class SpecializedAnalyzer(BaseAnalyzer):
@ai_fn
def detailed_analysis(self, text: str) -> dict:
"""Perform detailed domain-specific analysis."""
pass
5. Memory and Context:
python
@ai_model
class ConversationalAgent:
llm_model = "gpt-4"
def __init__(self):
self.conversation_history = []
@ai_fn
def respond(self, user_input: str) -> str:
"""Respond based on the conversation history."""
self.conversation_history.append(user_input)
return # AI generates response
Key Benefits:
1. Encapsulation: Groups related AI functions and their configuration in a single class
2. Reusability: AI models can be instantiated multiple times with different configurations
3. Type Safety: Enforces input/output types for AI functions
4. State Management: Maintains context across multiple function calls
5. Configuration Management: Centralizes LLM settings for related functions
Example of Different Configurations:
python
@ai_model
class CreativeWriter:
llm_model = "gpt-4"
temperature = 0.9 # Higher temperature for more creative outputs
@ai_fn
def write_story(self, prompt: str) -> str:
"""Write a creative story based on the prompt."""
pass
@ai_model
class TechnicalWriter:
llm_model = "gpt-4"
temperature = 0.2 # Lower temperature for more focused, precise outputs
@ai_fn
def write_documentation(self, code: str) -> str:
"""Write technical documentation for the given code."""
pass
Common Patterns:
1. Task-Specific Agents:
python
@ai_model
class ResearchAgent:
llm_model = "gpt-4"
@ai_fn
def search(self, query: str) -> list[str]:
"""Generate search queries."""
pass
@ai_fn
def synthesize(self, results: list[str]) -> str:
"""Synthesize search results."""
pass
2. Chained Processing:
```python
@ai_model
class DocumentProcessor:
llm_model = "gpt-4"
@ai_fn
def extract_topics(self, text: str) -> list[str]:
"""Extract main topics from text."""
pass
@ai_fn
def summarize_topic(self, topicSanaz
06/30/2025, 1:09 PMMarvin
06/30/2025, 1:10 PMMarvin
06/30/2025, 1:11 PMpython
# Instead of
from marvin import ai_fn, ai_model # This won't work
# Use this instead
from marvin import fn as ai_fn # The decorator is now called 'fn'
from marvin import Agent as ai_model # The base class is now called 'Agent'
Here's a corrected example:
python
from marvin import fn as ai_fn
from marvin import Agent
from prefect import flow, task
@Agent
class ResearchAgent:
model = "openai:gpt-4" # Specify the model using provider:model_name format
@ai_fn
def research_topic(self, topic: str) -> str:
"""Research a topic and return key findings."""
pass
# Example usage
@flow
def research_flow(topic: str):
agent = ResearchAgent()
results = agent.research_topic(topic)
return results
If you're still having issues, make sure you:
1. Have the latest version of Marvin installed:
bash
pip install --upgrade marvin
2. Have set up your OpenAI API key:
python
import os
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
3. Have all required dependencies:
bash
pip install "marvin[all]" # Installs all optional dependencies
Alternative Import Patterns:
python
# Option 1: Import everything
import marvin
@marvin.Agent
class MyAgent:
model = "openai:gpt-4"
@marvin.fn
def my_function(self, input: str) -> str:
"""Process the input."""
pass
# Option 2: Direct imports
from marvin import Agent, fn
@Agent
class MyAgent:
model = "openai:gpt-4"
@fn
def my_function(self, input: str) -> str:
"""Process the input."""
pass
The main changes to note are:
- ai_fn
is now fn
- ai_model
is now Agent
- Model specification uses the format provider:model_name
- Configuration is more streamlined with environment variables
Would you like me to provide a complete working example with these updated imports?Sanaz
06/30/2025, 1:25 PMMarvin
06/30/2025, 1:25 PMMarvin
06/30/2025, 1:26 PMbash
# Create and navigate to project directory
mkdir marvin-workflow
cd marvin-workflow
# Create virtual environment
python -m venv venv
# Activate virtual environment
# On Windows:
venv\Scripts\activate
# On Unix or MacOS:
source venv/bin/activate
2. Install required packages:
bash
pip install "marvin[all]"
pip install "prefect>=3.0.0"
3. Create a .env
file in your project directory:
env
OPENAI_API_KEY=your-api-key-here
MARVIN_AGENT_MODEL=openai:gpt-4
4. Create a new file called workflow.py
with this example code:
python
import os
from dotenv import load_dotenv
from marvin import fn
from marvin import Agent
from prefect import flow, task
# Load environment variables
load_dotenv()
@Agent
class ResearchAgent:
model = "openai:gpt-4"
@fn
def research_topic(self, topic: str) -> str:
"""Research the given topic and provide detailed findings."""
pass
@Agent
class SummaryAgent:
model = "openai:gpt-4"
@fn
def create_summary(self, research: str) -> str:
"""Create a concise summary of the research findings."""
pass
@task
def perform_research(topic: str) -> str:
agent = ResearchAgent()
return agent.research_topic(topic)
@task
def create_summary(research: str) -> str:
agent = SummaryAgent()
return agent.create_summary(research)
@flow(name="Research Flow")
def research_flow(topic: str):
# Perform the research
research_results = perform_research(topic)
# Create a summary
summary = create_summary(research_results)
return {
"research": research_results,
"summary": summary
}
if __name__ == "__main__":
# Example usage
topic = "The impact of artificial intelligence on modern workflow automation"
results = research_flow(topic)
print("\n=== Research Results ===")
print(results["research"])
print("\n=== Summary ===")
print(results["summary"])
5. Run the workflow:
There are several ways to run the workflow:
A. Direct Python execution:
bash
python workflow.py
B. Using Prefect CLI (recommended for production):
bash
# Start the Prefect server (in a separate terminal)
prefect server start
# In another terminal, start a worker
prefect worker start -p process
# Deploy your flow
prefect deploy workflow.py:research_flow -n "research-deployment" -p process
# Run the deployment
prefect deployment run research-flow/research-deployment
C. Interactive Python shell:
python
>>> from workflow import research_flow
>>> results = research_flow("The future of quantum computing")
>>> print(results["summary"])
6. Monitor your flow:
- Open the Prefect UI at http://127.0.0.1:4200 (if using Prefect server)
- View flow runs, logs, and results
Important Tips:
1. Error Handling:
python
from prefect import flow, task
from prefect.utilities.annotations import retry
@task(retries=3, retry_delay_seconds=60)
def perform_research(topic: str) -> str:
try:
agent = ResearchAgent()
return agent.research_topic(topic)
except Exception as e:
print(f"Error during research: {e}")
raise
2. Logging:
```python
from prefect import get_run_logger
@task
def perform_research(topic: str) -> str:
logger = get_run_logger()
logger.info(f"Starting research on topic: {topic}")
agent = ResearchAgent()Sanaz
06/30/2025, 1:53 PMMarvin
06/30/2025, 1:53 PMMarvin
06/30/2025, 1:54 PMbash
# Start the Prefect server
prefect server start
This will:
- Start a local PostgreSQL database
- Launch the API server
- Start the UI server (typically at http://127.0.0.1:4200)
2. Configure Client Connection
bash
# Point your client to the local server
prefect config set PREFECT_API_URL="<http://127.0.0.1:4200/api>"
3. Create a Work Pool
bash
# Create a process work pool
prefect work-pool create my-process-pool --type process
# Create a kubernetes work pool (if using k8s)
prefect work-pool create my-k8s-pool --type kubernetes
4. Example Workflow Setup
Create a file `example_flow.py`:
python
from prefect import flow, task
from datetime import timedelta
@task(retries=3, retry_delay_seconds=60)
def process_data(data: int) -> int:
return data * 2
@flow(name="Example Flow", description="A simple example flow")
def my_flow(number: int = 1):
result = process_data(number)
return result
if __name__ == "__main__":
my_flow(5)
5. Deploy the Flow
python
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from example_flow import my_flow
# Create deployment
deployment = my_flow.to_deployment(
name="example-deployment",
work_pool_name="my-process-pool",
schedule=CronSchedule(cron="0 0 * * *"), # Run daily at midnight
tags=["example"]
)
# Apply the deployment
deployment.apply()
6. Start a Worker
bash
# Start a worker for your work pool
prefect worker start -p "my-process-pool"
7. Monitor and Manage
Access the UI at http://127.0.0.1:4200 to:
- Monitor flow runs
- View logs
- Manage deployments
- Configure work pools
- Set up notifications
8. Common Operations
python
from prefect import get_client
from prefect.deployments import run_deployment
async def manage_flows():
# Create API client
async with get_client() as client:
# Get flow runs
flow_runs = await client.read_flow_runs()
# Start a deployment
deployment_run = await run_deployment(
name="example-flow/example-deployment",
parameters={"number": 10}
)
# Get deployment details
deployment = await client.read_deployment("example-flow/example-deployment")
9. Environment Variables
Create a .env
file:
env
PREFECT_API_URL="<http://127.0.0.1:4200/api>"
PREFECT_SERVER_API_HOST="0.0.0.0"
PREFECT_SERVER_API_PORT="4200"
10. Production Configuration
For production, consider:
bash
# Set database URL
prefect config set PREFECT_API_DATABASE_CONNECTION_URL="<postgresql+asyncpg://user:pass@hostname:5432/db>"
# Set API URL
prefect config set PREFECT_API_URL="<http://your-server:4200/api>"
11. Server Management Commands
bash
# Check server status
prefect server status
# View server settings
prefect config view
# Clear server database (be careful!)
prefect server database reset
# Start server in development mode
prefect server start --dev
12. Troubleshooting
If you encounter issues:
bash
# Check server logs
prefect server logs
# Verify configuration
prefect config view
# Reset configuration
prefect config unset PREFECT_API_URL
# Check worker status
prefect worker status
Common Issues and Solutions:
- If the UI isn't accessible, check if port 4200 is available
- If workers can't connectSanaz
06/30/2025, 3:41 PMMarvin
06/30/2025, 3:41 PMMarvin
06/30/2025, 3:48 PMbash
pip install "marvin[all]"
pip install prefect
pip install duckduckgo-search
pip install beautifulsoup4
pip install requests
2. Create a file named `web_search_agent.py`:
```python
from typing import List, Dict
from marvin import Agent, fn
from prefect import flow, task
from duckduckgo_search import ddg
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import time
@Agent
class WebSearchAgent:
model = "openai:gpt-4"
temperature = 0.3 # Lower temperature for more focused results
@fn
def generate_search_queries(self, topic: str) -> List[str]:
"""
Generate a list of specific search queries based on the main topic.
The queries should cover different aspects of the topic.
Return a list of 3-5 specific search queries.
"""
pass
@fn
def analyze_search_results(self,
search_results: List[Dict],
original_query: str) -> List[Dict]:
"""
Analyze search results and identify the most relevant ones.
Filter out any irrelevant or low-quality results.
Args:
search_results: List of search results with titles and descriptions
original_query: The original search query
Returns:
List of filtered and ranked results with relevance scores
"""
pass
@fn
def synthesize_information(self,
webpage_contents: List[Dict],
query: str) -> Dict:
"""
Synthesize information from multiple webpage contents into a coherent summary.
Args:
webpage_contents: List of dictionaries containing webpage content and metadata
query: Original search query
Returns:
Dictionary containing:
- summary: Overall summary of findings
- key_points: List of key points
- sources: List of sources used
"""
pass
@task(retries=2, retry_delay_seconds=30)
def fetch_webpage_content(url: str) -> Dict:
"""Fetch and extract content from a webpage."""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'footer', 'iframe']):
element.decompose()
# Extract main content
content = ' '.join(soup.stripped_strings)
return {
'url': url,
'domain': urlparse(url).netloc,
'content': content[:8000], # Limit content length
'status': 'success'
}
except Exception as e:
return {
'url': url,
'domain': urlparse(url).netloc,
'content': '',
'status': f'error: {str(e)}'
}
@task
def perform_web_search(query: str, max_results: int = 5) -> List[Dict]:
"""Perform web search using DuckDuckGo."""
try:
results = ddg(query, max_results=max_results)
return list(results)
except Exception as e:
print(f"Search error: {e}")
return []
@flow(name="Web Research Flow")
def web_research_flow(
topic: str,
max_results_per_query: int = 3,
max_queries: int = 3
) -> Dict:
"""
Orchestrates the web research process.
Args:
topic: The main topic to research
max_results_per_query: Maximum number of results toSanaz
06/30/2025, 3:57 PMMarvin
06/30/2025, 3:57 PMMarvin
06/30/2025, 3:58 PMpython
from marvin import Agent, fn
from marvin.tools.image import ImageTool
from PIL import Image
import io
@Agent
class ImageAnalysisAgent:
model = "openai:gpt-4-vision-preview" # Note: Use vision model for image analysis
@fn
def analyze_image(self, image_path: str) -> dict:
"""
Analyze the content of an image and provide detailed description.
"""
# Load and process image using ImageTool
image_tool = ImageTool()
image_data = image_tool.load(image_path)
return image_data
@fn
def generate_image_caption(self, image_path: str) -> str:
"""
Generate a descriptive caption for the image.
"""
pass
# Example usage
@flow
def process_image_flow(image_path: str):
agent = ImageAnalysisAgent()
analysis = agent.analyze_image(image_path)
caption = agent.generate_image_caption(image_path)
return {"analysis": analysis, "caption": caption}
2. Web Search
python
from marvin import Agent, fn
from marvin.tools.web import WebSearchTool, WebScrapeTool
from typing import List, Dict
@Agent
class WebResearchAgent:
model = "openai:gpt-4"
def __init__(self):
self.search_tool = WebSearchTool()
self.scrape_tool = WebScrapeTool()
@fn
def search_web(self, query: str, max_results: int = 5) -> List[Dict]:
"""
Perform web search and return results.
"""
results = self.search_tool.search(query, max_results=max_results)
return results
@fn
def scrape_webpage(self, url: str) -> Dict:
"""
Scrape content from a webpage.
"""
content = self.scrape_tool.scrape(url)
return content
# Example usage
@flow
def web_research_flow(query: str):
agent = WebResearchAgent()
search_results = agent.search_web(query)
detailed_results = []
for result in search_results:
content = agent.scrape_webpage(result['url'])
detailed_results.append(content)
return detailed_results
3. Audio Files
python
from marvin import Agent, fn
from marvin.tools.audio import AudioTool
import numpy as np
from typing import Dict
@Agent
class AudioProcessingAgent:
model = "openai:gpt-4"
def __init__(self):
self.audio_tool = AudioTool()
@fn
def transcribe_audio(self, audio_path: str) -> str:
"""
Transcribe speech from audio file to text.
"""
return self.audio_tool.transcribe(audio_path)
@fn
def analyze_audio(self, audio_path: str) -> Dict:
"""
Analyze audio characteristics and content.
"""
audio_data = self.audio_tool.load(audio_path)
return {
"duration": audio_data.duration,
"sample_rate": audio_data.sample_rate,
"channels": audio_data.channels,
"analysis": "Detailed analysis of audio content"
}
# Example usage
@flow
def process_audio_flow(audio_path: str):
agent = AudioProcessingAgent()
# Transcribe audio to text
transcript = agent.transcribe_audio(audio_path)
# Analyze audio characteristics
analysis = agent.analyze_audio(audio_path)
return {
"transcript": transcript,
"analysis": analysis
}
4. Combined Multimedia Processing
```python
from marvin import Agent, fnSanaz
06/30/2025, 4:28 PMMarvin
06/30/2025, 4:28 PMMarvin
06/30/2025, 4:30 PMpython
# Core Marvin imports
from marvin import Agent
from marvin.tools import search, web, images, audio
from prefect import flow, task
# If you need specific tools
from marvin.tools.search import DuckDuckGoSearchTool
from marvin.tools.web import WebScraper
from marvin.tools.images import ImageAnalyzer
from marvin.tools.audio import AudioProcessor
# Example implementation
@Agent
class MultiMediaAgent:
model = "openai:gpt-4"
def __init__(self):
self.search_tool = DuckDuckGoSearchTool()
self.web_scraper = WebScraper()
self.image_analyzer = ImageAnalyzer()
self.audio_processor = AudioProcessor()
def search_and_analyze(self, query: str):
# Implement your search and analysis logic here
results = self.search_tool.search(query)
return results
@flow
def multimedia_flow(query: str):
agent = MultiMediaAgent()
results = agent.search_and_analyze(query)
return results
Alternatively, for a simpler approach using just the basic tools:
python
from marvin import Agent
from prefect import flow, task
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import ddg
@Agent
class SimpleWebSearchAgent:
model = "openai:gpt-4"
def search_web(self, query: str, max_results: int = 5):
try:
results = ddg(query, max_results=max_results)
return list(results)
except Exception as e:
print(f"Search error: {e}")
return []
def scrape_webpage(self, url: str):
try:
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
return soup.get_text()
except Exception as e:
print(f"Scraping error: {e}")
return ""
@flow
def search_flow(query: str):
agent = SimpleWebSearchAgent()
search_results = agent.search_web(query)
all_content = []
for result in search_results:
content = agent.scrape_webpage(result['link'])
all_content.append({
'url': result['link'],
'content': content
})
return all_content
# Usage
if __name__ == "__main__":
results = search_flow("artificial intelligence latest developments")
print(results)
For this to work, make sure you have the required dependencies:
bash
pip install marvin
pip install prefect
pip install duckduckgo-search
pip install beautifulsoup4
pip install requests
Note: The exact tool names and imports might vary slightly depending on your Marvin version. If you encounter any specific import errors, please let me know the version you're using, and I can provide the exact imports for that version.
Would you like me to provide more specific examples for any particular functionality (web search, image processing, or audio processing)?Sanaz
06/30/2025, 4:54 PMMarvin
06/30/2025, 4:54 PMMarvin
06/30/2025, 4:55 PMpython
from marvin import Agent
from prefect import flow, task
from typing import List, Dict
# For web search
from duckduckgo_search import ddg
import requests
from bs4 import BeautifulSoup
# For image processing (if needed)
from PIL import Image
import io
# For audio processing (if needed)
import librosa
class WebSearchTool:
def search(self, query: str, max_results: int = 5) -> List[Dict]:
try:
results = ddg(query, max_results=max_results)
return list(results)
except Exception as e:
print(f"Search error: {e}")
return []
def scrape(self, url: str) -> str:
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
return soup.get_text()
except Exception as e:
print(f"Scraping error: {e}")
return ""
class ImageTool:
def load_image(self, image_path: str):
try:
return Image.open(image_path)
except Exception as e:
print(f"Image loading error: {e}")
return None
def analyze_image(self, image):
# Add your image analysis logic here
return {
"size": image.size,
"mode": image.mode,
"format": image.format
}
class AudioTool:
def load_audio(self, audio_path: str):
try:
return librosa.load(audio_path)
except Exception as e:
print(f"Audio loading error: {e}")
return None
def analyze_audio(self, audio_data):
# Add your audio analysis logic here
return {
"duration": librosa.get_duration(y=audio_data[0], sr=audio_data[1]),
"sample_rate": audio_data[1]
}
@Agent
class MultiMediaAgent:
model = "openai:gpt-4"
def __init__(self):
self.web_tool = WebSearchTool()
self.image_tool = ImageTool()
self.audio_tool = AudioTool()
def search_and_analyze(self, query: str) -> Dict:
"""
Perform web search and analysis.
"""
search_results = self.web_tool.search(query)
analyzed_results = []
for result in search_results:
content = self.web_tool.scrape(result['link'])
analyzed_results.append({
'url': result['link'],
'title': result['title'],
'content': content
})
return analyzed_results
@flow
def multimedia_research_flow(
query: str,
max_results: int = 5
) -> Dict:
"""
Execute a multimedia research flow.
Args:
query: Search query
max_results: Maximum number of results to return
"""
agent = MultiMediaAgent()
results = agent.search_and_analyze(query)
return results
# Example usage
if __name__ == "__main__":
# Install required dependencies first:
# pip install marvin prefect duckduckgo-search beautifulsoup4 requests Pillow librosa
query = "Latest developments in artificial intelligence"
results = multimedia_research_flow(query)
print(results)
To use this code, you'll need to install the required dependencies:
```bash
# Core requirements
pip install marvin
pip install prefect
# For web search and scraping
pip install duckduckgo-searchSanaz
07/01/2025, 2:12 PMMarvin
07/01/2025, 2:12 PMMarvin
07/01/2025, 2:13 PMbash
pip install marvin prefect smol-agent selenium webdriver-manager
2. Here's an example that combines SmolAgent's web browsing capabilities with Marvin's agent framework:
python
from marvin import Agent
from prefect import flow, task
from smol_agent.agent import SmolAgent
from smol_agent.tools import WebBrowserTool
from typing import Dict, List
import asyncio
class WebBrowserAgent:
def __init__(self):
self.browser_tool = WebBrowserTool()
self.smol_agent = SmolAgent(
tools=[self.browser_tool],
llm_name="gpt-4"
)
async def visit_webpage(self, url: str) -> str:
"""Visit a webpage and extract its content using SmolAgent."""
prompt = f"""
Visit this webpage: {url}
Extract the main content and any relevant information.
Ignore ads and navigation elements.
"""
result = await self.smol_agent.run(prompt)
return result
async def search_and_browse(self, query: str) -> str:
"""Perform a search and browse results using SmolAgent."""
prompt = f"""
Search for: {query}
Visit the most relevant result and extract key information.
"""
result = await self.smol_agent.run(prompt)
return result
@Agent
class MarvinWebResearchAgent:
model = "openai:gpt-4"
def __init__(self):
self.web_browser = WebBrowserAgent()
async def research_topic(self, topic: str) -> Dict:
"""
Research a topic using SmolAgent for web browsing and Marvin for analysis.
"""
# Use SmolAgent to gather information
raw_info = await self.web_browser.search_and_browse(topic)
# Use Marvin to analyze and structure the information
analysis = {
"topic": topic,
"raw_information": raw_info,
"key_points": "Extract key points here",
"summary": "Provide summary here"
}
return analysis
@task
async def perform_research(topic: str) -> Dict:
agent = MarvinWebResearchAgent()
return await agent.research_topic(topic)
@flow
def web_research_flow(topic: str) -> Dict:
"""
Orchestrate web research using both SmolAgent and Marvin.
"""
return asyncio.run(perform_research(topic))
# Example with specific webpage visits
class DetailedWebResearchAgent:
def __init__(self):
self.web_browser = WebBrowserAgent()
async def analyze_multiple_pages(self, urls: List[str]) -> List[Dict]:
"""
Visit and analyze multiple web pages.
"""
results = []
for url in urls:
content = await self.web_browser.visit_webpage(url)
results.append({
"url": url,
"content": content
})
return results
@flow
def detailed_research_flow(urls: List[str]) -> List[Dict]:
"""
Analyze specific web pages using SmolAgent's browsing capabilities.
"""
agent = DetailedWebResearchAgent()
return asyncio.run(agent.analyze_multiple_pages(urls))
# Example usage
if __name__ == "__main__":
# Example 1: General topic research
topic = "Latest developments in quantum computing"
results = web_research_flow(topic)
print("Topic Research Results:", results)
# Example 2: Specific webpage analysis
urls = [
"<https://example.com/page1>",
"<https://example.com/page2>"
]
page_analyses = detailed_research_flow(urls)
print("Webpage Analyses:", page_analyses)
3. For more advanced web interactions, you can extend the SmolAgent capabilities:
```pythonSanaz
07/01/2025, 2:45 PMMarvin
07/01/2025, 2:45 PMMarvin
07/01/2025, 2:46 PMpython
from marvin import Agent
from prefect import flow, task
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
from typing import Dict, List
import time
class WebBrowserTool:
def __init__(self):
# Set up Chrome options
self.chrome_options = Options()
self.chrome_options.add_argument('--headless') # Run in headless mode
self.chrome_options.add_argument('--no-sandbox')
self.chrome_options.add_argument('--disable-dev-shm-usage')
# Initialize the webdriver
self.driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=self.chrome_options
)
def visit_webpage(self, url: str) -> str:
"""Visit a webpage and extract its content."""
try:
self.driver.get(url)
# Wait for dynamic content to load
time.sleep(2)
# Get the page source and parse with BeautifulSoup
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'footer']):
element.decompose()
return soup.get_text(strip=True)
except Exception as e:
print(f"Error visiting webpage: {e}")
return ""
def __del__(self):
"""Clean up the webdriver when done."""
if hasattr(self, 'driver'):
self.driver.quit()
@Agent
class WebResearchAgent:
model = "openai:gpt-4"
def __init__(self):
self.browser = WebBrowserTool()
def analyze_webpage(self, url: str) -> Dict:
"""
Visit and analyze a webpage using Marvin's capabilities.
"""
content = self.browser.visit_webpage(url)
# Use the agent to analyze the content
analysis = {
"url": url,
"content": content,
"summary": "Summary will be generated here",
"key_points": ["Key points will be extracted here"]
}
return analysis
@flow
def web_research_flow(urls: List[str]) -> List[Dict]:
"""
Research flow that visits and analyzes multiple webpages.
"""
agent = WebResearchAgent()
results = []
for url in urls:
analysis = agent.analyze_webpage(url)
results.append(analysis)
return results
# Example usage
if __name__ == "__main__":
# First, install required dependencies:
# pip install marvin prefect selenium webdriver-manager beautifulsoup4
urls = [
"<https://example.com/page1>",
"<https://example.com/page2>"
]
results = web_research_flow(urls)
print("Research Results:", results)
To use this code, you'll need to install these dependencies:
bash
pip install marvin
pip install prefect
pip install selenium
pip install webdriver-manager
pip install beautifulsoup4
This solution provides:
1. A WebBrowserTool that uses Selenium for actual web browsing
2. Integration with Marvin's Agent for content analysis
3. A Prefect flow for orchestrating the research process
The advantages of this approach:
- Can handle dynamic JavaScript-rendered content
- More reliable than simple HTTP requests
- Can be extended to handle more complex web interactions
- Integrates well with Marvin's AI capabilities
Would you like me to explain any part of this implementation in more detail or show how to extend it for specific use cases?Sanaz
07/01/2025, 8:53 PMMarvin
07/01/2025, 8:53 PMMarvin
07/01/2025, 8:53 PMai_fn
decorator was part of an older version of Marvin, but the import structure has changed.
Here's how to fix it:
1. First, make sure you have the latest version of Marvin installed:
bash
pip install -U marvin
2. Then, update your import statement to:
python
from marvin.components import ai_fn
If you're still having issues after that, you might want to check:
1. Your Python environment is clean (no conflicting packages)
2. You have the correct dependencies installed
3. Your OpenAI API key is properly configured
Also, if you're following any tutorials or documentation, make sure you're looking at the most current version, as the Marvin API has evolved over time.
If you continue to have issues, please let me know what version of Marvin you're using (you can check with pip show marvin
), and I can provide more specific guidance.Sanaz
07/01/2025, 8:57 PMMarvin
07/01/2025, 8:57 PMMarvin
07/01/2025, 8:57 PMai_fn
decorator has been replaced with a new pattern.
Here's how to use function creation in Marvin 3.x:
python
from marvin import fn
# Example usage:
@fn
def your_function(input: str) -> str:
"""Your function description here"""
return NotImplemented
The change from ai_fn
to fn
is part of Marvin's 3.0 release which included significant API changes to make the library more intuitive and consistent.
If you're following any tutorials or documentation that uses ai_fn
, you're likely looking at outdated material for Marvin 1.x or 2.x. I'd recommend referring to the current documentation at:
- https://www.askmarvin.ai/components/functions/
- https://www.askmarvin.ai/welcome/quickstart/
Let me know if you need any clarification on using the new fn
decorator!Sanaz
07/07/2025, 7:29 PMMarvin
07/07/2025, 7:29 PMMarvin
07/07/2025, 7:29 PMbash
pip install pandas openpyxl
2. Here's a basic example of a Prefect flow that reads an Excel file:
python
from prefect import flow, task
import pandas as pd
@task
def read_excel_data(file_path: str) -> pd.DataFrame:
return pd.read_excel(file_path)
@task
def process_data(df: pd.DataFrame):
# Add your data processing logic here
return df
@flow
def excel_processing_flow(file_path: str):
# Read the Excel file
df = read_excel_data(file_path)
# Process the data
processed_data = process_data(df)
return processed_data
# Run the flow
if __name__ == "__main__":
result = excel_processing_flow("path/to/your/excel_file.xlsx")
Some additional tips for working with Excel data:
1. Specific Sheets: If you need to read a specific sheet:
python
@task
def read_specific_sheet(file_path: str, sheet_name: str) -> pd.DataFrame:
return pd.read_excel(file_path, sheet_name=sheet_name)
2. Multiple Sheets: To read all sheets:
python
@task
def read_all_sheets(file_path: str) -> dict:
return pd.read_excel(file_path, sheet_name=None)
3. Error Handling: It's good practice to add error handling:
python
from prefect import flow, task
import pandas as pd
@task
def read_excel_data(file_path: str) -> pd.DataFrame:
try:
return pd.read_excel(file_path)
except FileNotFoundError:
raise ValueError(f"Excel file not found at: {file_path}")
except Exception as e:
raise ValueError(f"Error reading Excel file: {str(e)}")
4. Large Files: For large Excel files, you might want to read them in chunks:
python
@task
def read_large_excel(file_path: str, chunksize: int = 1000):
for chunk in pd.read_excel(file_path, chunksize=chunksize):
# Process each chunk
yield chunk
Remember that pandas provides many options for reading Excel files, such as:
- Specifying ranges using usecols
and skiprows
- Handling date formats with parse_dates
- Managing missing values with na_values
Example with more options:
python
@task
def read_excel_advanced(file_path: str) -> pd.DataFrame:
return pd.read_excel(
file_path,
sheet_name='Sheet1',
usecols='A:D', # Specify columns to read
skiprows=1, # Skip first row
na_values=['NA', 'missing'], # Custom NA values
parse_dates=['Date_Column'] # Parse date columns
)
The data read from Excel will be available as a pandas DataFrame within your flow, which you can then process, transform, or analyze as needed. You can combine this with other Prefect features like:
- Logging the shape of your data
- Adding retries for network-based Excel files
- Using parameters to make the file path configurable
- Implementing data validation tasks
This approach gives you a robust way to incorporate Excel data processing into your Prefect workflows while maintaining all the benefits of Prefect's task and flow management capabilities.Sanaz
07/08/2025, 4:37 PMMarvin
07/08/2025, 4:37 PMMarvin
07/08/2025, 4:38 PM@fn
decorator? Are you seeing it in some Prefect code, or is it from a different library? Also, could you share a code snippet showing how it's being used? This would help me give you a more accurate and helpful answer.
The most common decorators you'll see in Prefect are @flow
and @task
, but I don't recognize @fn
as a standard Prefect decorator. With more context, I can provide a much more specific and useful explanation.Sanaz
07/08/2025, 4:41 PMMarvin
07/08/2025, 4:41 PMMarvin
07/08/2025, 4:41 PM@fn
is likely a decorator from the Marvin framework that marks this function as a callable unit within their system. It's being used to decorate an orchestration function that routes tasks to different AI agents based on their type.
Let me break down what's happening in this code:
1. The @fn
decorator marks this Python function as a special Marvin function that can be integrated into their workflow system
2. The decorated function (orchestration_agent
) takes two parameters:
- `task`: A dictionary containing instructions
- `context`: A dictionary with the question and other relevant details
3. The function then:
- Uses Marvin's LLM to decide which specialized agent should handle the task
- Maps the LLM's decision to actual agent implementations
- Executes the chosen agent with the task instructions
- Returns the result
This pattern is common in AI orchestration frameworks where you need to mark certain functions as special handlers or processors. The decorator likely adds some functionality like:
- Error handling
- Logging
- Input validation
- Context management
- Integration with the broader Marvin framework
However, since this isn't Prefect code, I'd recommend checking the Marvin framework's documentation for the exact specifics of what their @fn
decorator provides. If you're working with Prefect, you'd use @flow
or @task
decorators instead, which serve similar but Prefect-specific purposes.Sanaz
07/08/2025, 5:34 PMMarvin
07/08/2025, 5:34 PMMarvin
07/08/2025, 5:35 PM.id
attribute on a function object where Jinja2 is expecting a different type of object. Common scenarios where this might happen include:
1. Passing a raw function instead of a flow/task object to a template
2. Incorrect template variable references
3. Missing decorators that would transform the function into the expected object type
Without seeing your code, here are some general tips to check:
1. If you're working with flows, make sure they're properly decorated with @flow
2. If you're using templates in notifications or webhooks, verify that you're referencing the correct context variables
3. Check that any functions you're passing to templates are properly wrapped or transformed into the expected object type
If you could share the relevant code snippets, I can provide more specific guidance on fixing the issue.Sanaz
07/08/2025, 5:39 PMMarvin
07/08/2025, 5:39 PMMarvin
07/08/2025, 5:39 PMSanaz
07/08/2025, 5:45 PMMarvin
07/08/2025, 5:45 PMSanaz
07/08/2025, 5:47 PMMarvin
07/08/2025, 5:47 PMMarvin
07/08/2025, 5:47 PM@fn
decorator might not be properly transforming the functions into objects with the required attributes that Jinja2 is expecting
2. The marvin.run()
results might be getting passed to a template that expects an object with an id
attribute
3. The QA
result type (which isn't defined in the shared code) might be expecting certain attributes
Here are some potential fixes:
1. First, make sure you have the correct imports and decorators:
python
from marvin import fn # Make sure you're importing the decorator correctly
# Then use it as
@fn
def search_function(query: str) -> list[str]:
...
2. Define your QA type properly (if you haven't already):
python
from dataclasses import dataclass
@dataclass
class QA:
Answer: str
# Add any other fields you need
3. If you're using templates anywhere, make sure you're passing the correct object structure. For example:
python
# Instead of passing the function directly
context = {"func": search_function} # Wrong
# Pass the result or a proper object
context = {"result": search_function(query)} # Better
Could you also share:
1. Where exactly (which line) the error is occurring?
2. The definition of your QA
type if it's different from what I suggested?
3. Any template files you might be using?
Also, a few suggestions for your current code:
1. Consider adding error handling around your agent calls:
python
try:
result = marvin.run(task["instructions"], agents=[chosen_agent], context=context)
except Exception as e:
print(f"Error executing agent: {e}")
result = f"Error: {str(e)}"
return result
2. You might want to add logging to debug the agent selection:
python
print(f"Agent decision: {agent_decision}")
print(f"Chosen agent: {chosen_agent.name if chosen_agent else 'None'}")
This would help track where things might be going wrong in the orchestration process.Sanaz
07/08/2025, 5:52 PMMarvin
07/08/2025, 5:52 PMMarvin
07/08/2025, 5:52 PMmarvin.run()
.
Here's how to fix it:
1. First, create an Agent for the orchestration function:
python
# Create an orchestration agent instead of using the raw function
orchestration_agent = marvin.Agent(
name="orchestrator",
instructions="Route tasks to appropriate specialized agents based on the task requirements.",
tools=[orchestration_agent] # Your existing orchestration function becomes a tool
)
2. Then modify your main workflow to use this agent:
python
if __name__ == "__main__":
with marvin.Thread() as thread:
question = dataset[20]['Question']
fileinfo = dataset[20]['file_path']
Q = question + fileinfo
plan = marvin.plan(Q)
for i, task in enumerate(plan, 1):
print(f"{i}. {task.instructions}")
outputs = []
for task in plan:
print(f"\nExecuting task: {task.instructions}")
# Use the orchestration agent object, not the function
res = marvin.run(
task.instructions,
agents=[orchestration_agent], # Now using the Agent object
context={"question": question}
)
outputs.append(res)
final = marvin.run(
"Using the gathered outputs, write the final answer.",
agents=[answer_agent],
result_type=QA,
context={"question": question, "info": outputs}
)
print("\n:bulb: Final Answer:\n")
print(final.Answer)
The error occurs because Marvin's internal templating system expects an Agent object with certain attributes (including an id
), but it's getting a raw function instead. By wrapping your orchestration function in a proper Agent object, you provide the structure that Marvin's templating system expects.
Also, for better error handling and debugging, you might want to add:
python
@fn
def orchestration_agent(task: dict, context: dict) -> str:
try:
instructions = task["instructions"].lower()
print(f"Processing instructions: {instructions}") # Debug logging
agent_decision = marvin.run(
f"Based on the following task instructions: '{instructions}', which agent should handle this task?",
agents=[answer_agent],
context=context
)
print(f"Agent decision: {agent_decision}") # Debug logging
# ... rest of your function ...
except Exception as e:
print(f"Error in orchestration: {e}")
raise
This should resolve the Jinja2 template error and provide better visibility into the orchestration process.Sanaz
07/08/2025, 6:11 PMMarvin
07/08/2025, 6:11 PMMarvin
07/08/2025, 6:11 PMpython
@fn
def orchestration_agent(task: dict, context: dict) -> str:
instructions = task["instructions"].lower()
file_path = context.get("file_path", "") # Get file path from context
# Enhanced decision making for image tasks
if any(word in instructions.lower() for word in ["image", "picture", "photo", "graph", "plot", "figure"]) or \
any(ext in file_path.lower() for ext in [".jpg", ".jpeg", ".png", ".gif"]):
print("Routing to GPT Image Agent") # Debug logging
result = marvin.run(
instructions,
agents=[gpt_image_agent],
context={"file_path": file_path, "question": context.get("question", "")}
)
return result
# Regular agent selection for other tasks
agent_decision = marvin.run(
f"Based on the following task instructions: '{instructions}', which agent should handle this task?",
agents=[answer_agent],
context=context
)
agent_map = {
"search": search_agent,
"file": file_processing_agent,
"arxiv": arxiv_agent,
"image": gpt_image_agent,
"audio": audio_transcription_agent,
"youtube": youtube_download_agent
}
chosen_agent = None
for agent_name, agent in agent_map.items():
if agent_name in agent_decision.lower():
chosen_agent = agent
break
if not chosen_agent:
chosen_agent = answer_agent
result = marvin.run(instructions, agents=[chosen_agent], context=context)
return result
Then modify your main workflow to include the file path in the context:
python
if __name__ == "__main__":
with marvin.Thread() as thread:
question = dataset[20]['Question']
fileinfo = dataset[20]['file_path']
Q = question + fileinfo
plan = marvin.plan(Q)
for i, task in enumerate(plan, 1):
print(f"{i}. {task.instructions}")
outputs = []
for task in plan:
print(f"\nExecuting task: {task.instructions}")
# Include file_path in the context
res = marvin.run(
task.instructions,
agents=[orchestration_agent],
context={
"question": question,
"file_path": fileinfo, # Add file path to context
"full_query": Q
}
)
outputs.append(res)
final = marvin.run(
"Using the gathered outputs, write the final answer.",
agents=[answer_agent],
result_type=QA,
context={"question": question, "info": outputs}
)
print("\n:bulb: Final Answer:\n")
print(final.Answer)
Also, modify your gpt_image_agent definition to be more explicit about its capabilities:
python
gpt_image_agent = marvin.Agent(
name="gpt-image",
instructions="""Process and describe images with GPT-4o.
I can analyze graphs, charts, photographs, and other visual content.
I will extract relevant information and provide detailed descriptions based on the question.""",
tools=[gpt_4o_image_function]
)
These changes should:
1. Better detect when a task requires image processing
2. Properly route image-related tasks to the GPT-4o image agent
3. Ensure the necessary file path information is available in the context
4. Make the image agent's capabilities more explicit to the orchestration system
If you're still having issues, could you share:
1. An example of the task instructions that are failing
2. The contents of dataset[20]
(both Question and file_path)
3. Whether the image file exists at the specified pathSanaz
07/08/2025, 7:32 PMMarvin
07/08/2025, 7:32 PMMarvin
07/08/2025, 7:32 PMpython
@fn
def orchestration_agent(task: dict, context: dict) -> str:
print("\n=== Orchestration Debug ===")
print(f"Received task: {task}")
print(f"Context: {context}")
instructions = task["instructions"].lower()
# Debug the instruction analysis
print("\nAnalyzing instructions...")
is_image_task = any(word in instructions.lower() for word in ["image", "picture", "photo", "graph", "plot", "figure", "ocr"])
print(f"Is image task? {is_image_task}")
# If it's clearly an image task, route directly
if is_image_task:
print("\nRouting directly to GPT Image Agent")
try:
result = marvin.run(
instructions,
agents=[gpt_image_agent],
context=context
)
print(f"Image agent result: {result}")
return result
except Exception as e:
print(f"Error in image agent execution: {str(e)}")
raise
# If not clearly an image task, use LLM for decision
print("\nAsking LLM for agent decision...")
try:
agent_decision = marvin.run(
f"Based on the following task instructions: '{instructions}', which agent should handle this task?",
agents=[answer_agent],
context=context
)
print(f"LLM agent decision: {agent_decision}")
except Exception as e:
print(f"Error in agent decision: {str(e)}")
raise
agent_map = {
"search": search_agent,
"file": file_processing_agent,
"arxiv": arxiv_agent,
"image": gpt_image_agent,
"audio": audio_transcription_agent,
"youtube": youtube_download_agent
}
chosen_agent = None
for agent_name, agent in agent_map.items():
if agent_name in agent_decision.lower():
chosen_agent = agent
print(f"\nChosen agent: {agent_name}")
break
if not chosen_agent:
print("\nNo specific agent matched, defaulting to answer agent")
chosen_agent = answer_agent
print(f"\nExecuting task with {chosen_agent.name} agent...")
try:
result = marvin.run(instructions, agents=[chosen_agent], context=context)
print(f"Execution result: {result}")
return result
except Exception as e:
print(f"Error in task execution: {str(e)}")
raise
# Also modify your main workflow loop for debugging
if __name__ == "__main__":
with marvin.Thread() as thread:
question = dataset[20]['Question']
fileinfo = dataset[20]['file_path']
Q = question + fileinfo
plan = marvin.plan(Q)
print("\n=== Execution Plan ===")
for i, task in enumerate(plan, 1):
print(f"{i}. {task.instructions}")
outputs = []
for i, task in enumerate(plan):
print(f"\n=== Executing Task {i+1} ===")
print(f"Instructions: {task.instructions}")
print(f"File path exists: {os.path.exists(fileinfo)}") # Add this line
try:
res = marvin.run(
task.instructions,
agents=[orchestration_agent],
context={
"question": question,
"file_path": fileinfo,
"full_query": Q
}
)
print(f"Task {i+1} Result: {res}")
outputs.append(res)
except Exception as e:
print(f"Error in task {i+1}: {str(e)}")
print(f"Error type: {type(e)}")
print(f"Full traceback: {traceback.format_exc()}")
Also, let's verify the image processing function:
```python
# Add this debug wrapper
@fnMarvin
07/08/2025, 7:32 PMThis debugging code will help us understand:
1. Which agent is being selected
2. Whether the file path is valid
3. What decisions are being made in the orchestration process
4. Where exactly any errors are occurring
Run this modified code and share the debug output - it will help us pinpoint exactly where the process is failing and why the image processing isn't being handled correctly.
Also, for this specific task (extracting red and green numbers), you might want to modify the gpt_image_agent's instructions to be more specific:
```python
gpt_image_agent = marvin.Agent(
name="gpt-image",
instructions="""Process images with GPT-4o to extract numerical information.
I can identify and extract numbers of different colors, read text and numbers from images,
and perform OCR-like tasks. For colored numbers, I will explicitly state the color and value of each number found.""",
tools=[gpt_4o_image_function]
)
After you run this debug version, we can better understand where the process is getting stuck and make the necessary adjustments.Sanaz
07/08/2025, 7:41 PMMarvin
07/08/2025, 7:41 PMMarvin
07/08/2025, 7:41 PMSanaz
07/08/2025, 7:43 PMMarvin
07/08/2025, 7:43 PMMarvin
07/08/2025, 7:44 PMMarvin
07/08/2025, 7:44 PMAlso, update the GPT-4o image agent to be more specific about number extraction:
```python
gpt_image_agent = marvin.Agent(
name="gpt-image",
instructions="""
Analyze images to extract colored numbers. When asked about numbers:
1. Identify all numbers in the image
2. Classify them by color (specifically red and green)
3. Return them in a structured format:
Red numbers: [list of numbers]
Green numbers: [list of numbers]
Be precise in color identification and number extraction.
""",
tools=[gpt_4o_image_function]
)
This revised version:
1. Handles the image analysis first, extracting all numbers in one go
2. Properly structures the statistical calculations
3. Includes better error handling and debugging output
4. Uses a more specific format for number extraction
5. Processes the entire workflow in a more logical sequence
Try running this updated version and let me know what output you get. The debug information will help us identify any remaining issues.
Also, could you verify:
1. Does the image file exist at the specified path?
2. Can you share what the image looks like (or describe it) so we can verify the number extraction is working correctly?Sanaz
07/08/2025, 7:50 PMMarvin
07/08/2025, 7:50 PMMarvin
07/08/2025, 7:50 PMMarvin
07/08/2025, 7:50 PMKey improvements in this version:
1. *Better Task Analysis*: The orchestration agent now uses a structured prompt to select the appropriate agent.
2. *Standardized Context*: All tasks receive consistent context including previous results.
3. *Robust Error Handling*: Each level has proper error handling and debugging output.
4. *Clear Agent Definitions*: Each agent has specific, detailed instructions about its capabilities.
5. *Task State Management*: Previous results are passed forward in the context.
To use this system:
1. Make sure all your agent functions (`search_function`, `file_analysis_function`, etc.) are properly defined and return results in a consistent format.
2. Test with a simple task first to verify the orchestration is working:
```python
result = marvin.run(
"Search for information about Python",
agents=[orchestration_agent],
context={"question": "What is Python?"}
)
3. Monitor the debug output to see where any issues occur.
If you're still having issues, let me know:
1. Are you seeing any specific error messages?
2. Which part of the workflow is failing?
3. Are the agents receiving the tasks and context correctly?Sanaz
07/09/2025, 3:24 PMstatistics.pstdev()
function in Python:\n\n ```python\n import statistics\n\n red_numbers = [10, 15, 25, 30]\n red_stdev = statistics.pstdev(red_numbers)\n ```\n\n3. Calculate the Standard Sample Deviation of the Green Numbers:\n This assumes the set is a sample of a larger population. It can be calculated using `statistics.stdev()`:\n\n ```python\n green_numbers = [12, 14, 22, 28, 30]\n green_stdev = statistics.stdev(green_numbers)\n ```\n\n4. Average the Standard Deviations:\n Calculate the average of the two deviations obtained above:\n\n ```python\n average_stdev = (red_stdev + green_stdev) / 2\n ```\n\n5. Round to Three Decimal Points:\n Finally, round the result to three decimal places:\n\n ```python\n average_stdev_rounded = round(average_stdev, 3)\n ```\n\nWithout the actual numbers from the image, the output here is a theoretical framework. You can replace red_numbers and green_numbers with the actual numbers from your image to get the result. If you follow these steps, using the numbers proper, you will obtain the accurate rounded result.", refusal=None, role='assistant', annotations=[], audio=None, function_call=None, tool_calls=None))], created=1752074622, model='gpt-4o-2024-08-06', object='chat.completion', service_tier='default', system_fingerprint='fp_07871e2ad8', usage=CompletionUsage(completion_tokens=413, prompt_tokens=63, total_tokens=476, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0)))Marvin
07/09/2025, 3:24 PMMarvin
07/09/2025, 3:24 PMpython
@fn
def gpt_4o_image_function(image_path: str, Q: str) -> str:
try:
print(f"\n=== Processing Image ===")
print(f"Image path: {image_path}")
print(f"Question: {Q}")
# Verify file exists
if not os.path.exists(image_path):
return f"Error: Image file not found at {image_path}"
# Read the image
with open(image_path, "rb") as image_file:
image_data = image_file.read()
# Create a more direct prompt for the vision model
prompt = """
Look at this image and:
1. Identify all numbers that are colored in RED. List them as numbers, not words.
2. Identify all numbers that are colored in GREEN. List them as numbers, not words.
Format your response EXACTLY like this:
RED NUMBERS: [x, y, z]
GREEN NUMBERS: [a, b, c]
Only provide the lists of numbers, no explanation or additional text.
"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64.b64encode(image_data).decode('utf-8')}"
}
}
]
}
],
max_tokens=100 # Limit response length to encourage concise output
)
result = response.choices[0].message.content
print(f"Raw GPT-4o response: {result}")
# Validate the response format
if "RED NUMBERS:" not in result or "GREEN NUMBERS:" not in result:
return "Error: Vision model did not return properly formatted number lists"
return result
except Exception as e:
print(f"Error in image processing: {str(e)}")
return f"Error processing image: {str(e)}"
And update the image agent definition:
python
gpt_image_agent = marvin.Agent(
name="gpt-image",
instructions="""You are a specialized image analysis agent that:
1. ONLY extracts colored numbers from images
2. Returns EXACTLY two lists: red numbers and green numbers
3. Uses the format:
RED NUMBERS: [x, y, z]
GREEN NUMBERS: [a, b, c]
4. Never provides explanations or theoretical examples
5. Returns only the actual numbers found in the image""",
tools=[gpt_4o_image_function]
)
Key changes made:
1. More specific prompt for the vision model
2. Strict response format requirement
3. Token limit to prevent theoretical explanations
4. Better error handling and validation
5. Clear instructions for the agent to only return actual numbers
The previous version was getting theoretical because:
1. The prompt wasn't specific enough
2. The model was falling back to explanation mode
3. There was no format enforcement
Try this updated version and let me know if you're still seeing the theoretical response instead of actual image analysis. If it still doesn't work, we can:
1. Try different prompt formulations
2. Add more strict response parsing
3. Implement additional validation steps
Would you like to try this version, or should we modify it further?Sanaz
07/09/2025, 3:35 PMMarvin
07/09/2025, 3:35 PMMarvin
07/09/2025, 3:36 PMpython
@fn
def orchestration_agent(task: dict, context: dict) -> str:
print("\n=== Orchestration Debug ===")
print(f"Task received: {task}")
print(f"Context received: {context}")
# Get instructions from task
instructions = task.get("instructions") if isinstance(task, dict) else str(task)
# Get file path from context
file_path = context.get("fileinfo")
# If this is an image-related task
if any(word in instructions.lower() for word in ["image", "picture", "photo", "ocr", "numbers"]):
print(f"\nRouting to image agent with path: {file_path}")
chosen_agent = gpt_image_agent
# Pass the full task instructions and file path to the image agent
modified_context = {
"file_path": file_path,
"task_instructions": instructions,
"image_path": file_path, # Explicit path for the image function
"Q": instructions # Pass full instructions as the question
}
else:
# ... other agent selection logic ...
chosen_agent = answer_agent
modified_context = context
print(f"Selected agent: {chosen_agent.name}")
print(f"Modified context: {modified_context}")
# Execute with modified context
result = marvin.run(
instructions,
agents=[chosen_agent],
context=modified_context
)
return result
@fn
def gpt_4o_image_function(image_path: str, Q: str) -> str:
print("\n=== Image Function Debug ===")
print(f"Received image_path: {image_path}")
print(f"Received task: {Q}")
return get_image_description_gpt4o(image_path, Q)
def get_image_description_gpt4o(image_path: str, Q: str) -> str:
print(f"\n=== GPT-4o Processing ===")
print(f"Processing image at: {image_path}")
print(f"Processing task: {Q}")
print(f"File exists: {os.path.exists(image_path)}")
try:
with open(image_path, "rb") as image_file:
image_data = image_file.read()
# Modified prompt to focus on the specific task
response = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{
"type": "text",
"text": f"Complete this specific task for the image: {Q}"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64.b64encode(image_data).decode('utf-8')}"
}
}
]
}]
)
return response.choices[0].message.content
except FileNotFoundError:
return f"Error: Image file not found at {image_path}"
except Exception as e:
return f"Error processing image: {str(e)}"
# Main workflow remains the same, but ensure context includes file path
if __name__ == "__main__":
with marvin.Thread() as thread:
question = dataset[20]['Question']
fileinfo = dataset[20]['file_path']
plan = marvin.plan(question)
outputs = []
for task in plan:
print(f"\nExecuting task: {task.instructions}")
res = marvin.run(
task.instructions,
agents=[orchestration_agent],
context={
"question": question,
"fileinfo": fileinfo,
"task": task
}
)
outputs.append(res)
Key changes:
1. Orchestration agent now properly propagates the file path and task instructions
2. Context is modified specifically for image tasksMarvin
07/09/2025, 3:36 PMSanaz
07/09/2025, 6:08 PMMarvin
07/09/2025, 6:08 PMMarvin
07/09/2025, 6:09 PMMarvin
07/09/2025, 6:09 PMSanaz
07/09/2025, 6:22 PMMarvin
07/09/2025, 6:22 PMMarvin
07/09/2025, 6:22 PMMarvin
07/09/2025, 6:22 PMAnd in your main workflow, ensure you're passing the file path correctly:
```python
if __name__ == "__main__":
with marvin.Thread() as thread:
question = dataset[20]['Question']
fileinfo = dataset[20]['file_path']
plan = marvin.plan(question)
outputs = []
for task in plan:
print(f"\nExecuting task: {task.instructions}")
# Ensure file path is included in context
res = marvin.run(
task.instructions,
agents=[orchestration_agent],
context={
"question": question,
"fileinfo": fileinfo, # This should be the full path to your image
"task": task,
"file_path": fileinfo # Include both for compatibility
}
)
outputs.append(res)
Key changes made:
1. Added direct image task detection
2. Improved context passing for image tasks
3. Made the image agent instructions more explicit
4. Added more debug logging
5. Ensured file paths are properly passed through the chain
Could you run this version and share:
1. The debug output showing which agent is selected
2. Whether the image function is being called
3. The full path to the image that's being passed
This will help us identify if:
- The task is being correctly identified as an image task
- The image agent is being selected
- The file path is being properly passed through the chainSanaz
07/09/2025, 6:42 PMMarvin
07/09/2025, 6:42 PMMarvin
07/09/2025, 6:43 PMMarvin
07/09/2025, 6:43 PMSanaz
07/09/2025, 6:50 PMMarvin
07/09/2025, 6:50 PMMarvin
07/09/2025, 6:50 PMpython
# Example of context usage in Marvin agents
@fn
def orchestration_agent(task: dict, context: dict) -> str:
# Context received from marvin.run() call
print(f"Context received: {context}")
# Context can include things like:
file_path = context.get("file_path") # File paths
question = context.get("question") # Original question
task_info = context.get("task") # Task information
previous_results = context.get("previous_results") # Results from previous tasks
# You can also access global context using marvin.context
global_context = marvin.context.get()
# When calling another agent, you can pass modified context
modified_context = {
"image_path": file_path,
"Q": question,
"instructions": task_info,
# Add any other relevant information
}
result = marvin.run(
instructions,
agents=[chosen_agent],
context=modified_context # This context will be available to the next agent
)
Context is used for:
1. Passing Data Between Agents:
python
# In main workflow
res = marvin.run(
task.instructions,
agents=[orchestration_agent],
context={
"question": original_question,
"file_path": image_path,
"previous_results": previous_outputs
}
)
2. Accessing Task-Specific Information:
python
@fn
def gpt_4o_image_function(image_path: str, Q: str) -> str:
# Get additional context beyond function parameters
context = marvin.context.get()
specific_instructions = context.get("instructions")
original_task = context.get("task")
3. Maintaining State:
python
@fn
def process_with_history(input_data: str) -> str:
context = marvin.context.get()
previous_results = context.get("previous_results", [])
# Use previous results to inform current processing
4. Sharing Configuration:
python
# Set up common configuration in main workflow
common_context = {
"model_config": {"temperature": 0.7},
"api_keys": {"service_key": "xyz"},
"file_paths": {"base_dir": "/path/to/files"}
}
res = marvin.run(
instructions,
agents=[agent],
context=common_context
)
5. Error Handling and Debugging:
python
@fn
def safe_processing_agent(task: dict, context: dict) -> str:
try:
# Access debug information from context
debug_mode = context.get("debug", False)
if debug_mode:
print(f"Processing task with context: {context}")
result = process_task(task)
return result
except Exception as e:
# Add error information to context
error_context = {
"error": str(e),
"task": task,
"timestamp": datetime.now()
}
marvin.context.update(error_context)
raise
Best Practices for Context Usage:
1. Document Expected Context:
python
@fn
def my_agent(task: dict, context: dict) -> str:
"""
Agent that processes tasks with specific context requirements.
Expected context:
- file_path: str - Path to input file
- question: str - Original question
- task: dict - Task information
- previous_results: list - Results from previous tasks
"""
# Agent implementation
2. Validate Required Context:
python
@fn
def image_processing_agent(task: dict, context: dict) -> str:
required_keys = ["image_path", "instructions"]
missing_keys = [key for key in required_keys if key not in context]
if missing_keys:
return f"Error: Missing required context keys: {missing_keys}"
3. Maintain Context Chain:
```python
@fn
def orchestration_agent(task: dict, context: dict) -> str:
# Preserve original context while adding new information
modified_context = {Marvin
07/09/2025, 6:50 PMFor your specific case with image processing, you might want to structure your context like this:
```python
# In main workflow
context = {
"question": dataset[20]['Question'],
"file_path": dataset[20]['file_path'],
"task": current_task,
"execution_info": {
"step": current_step,
"total_steps": total_steps
},
"image_processing": {
"image_path": image_path,
"instructions": specific_instructions,
"expected_output": "list_of_numbers"
}
}
# This context will be available throughout the execution chain
res = marvin.run(
task.instructions,
agents=[orchestration_agent],
context=context
)
This structured approach to context helps maintain clear data flow and state management throughout your agent chain.Sanaz
07/09/2025, 7:14 PMMarvin
07/09/2025, 7:14 PMMarvin
07/09/2025, 7:15 PMMarvin
07/09/2025, 7:15 PMSanaz
07/09/2025, 7:16 PMMarvin
07/09/2025, 7:16 PMMarvin
07/09/2025, 7:16 PMMarvin
07/09/2025, 7:16 PMSanaz
07/09/2025, 7:42 PMMarvin
07/09/2025, 7:42 PMMarvin
07/09/2025, 7:43 PMpython
# First, define the image processing function
@fn
def gpt_4o_image_function(image_path: str, Q: str) -> str:
print("\n>>>>> IMAGE FUNCTION START <<<<<")
print(f"IMAGE PATH: {image_path}")
print(f"QUESTION: {Q}")
try:
with open(image_path, "rb") as image_file:
image_data = image_file.read()
print("Image file read successfully")
response = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": f"Complete this specific task for the image: {Q}"},
{"type": "image_url", "image_url": {
"url": f"data:image/jpeg;base64,{base64.b64encode(image_data).decode('utf-8')}"
}}
]
}]
)
return response.choices[0].message.content
except Exception as e:
print(f"Error: {str(e)}")
return f"Error processing image: {str(e)}"
# Define the image agent properly
gpt_image_agent = marvin.Agent(
name="gpt-image",
instructions="""You are an image analysis specialist that uses GPT-4 Vision to:
1. Process and analyze images
2. Extract text and numbers from images
3. Identify colors and patterns
4. Return specific information as requested
Always use the provided image processing function to analyze images.""",
tools=[gpt_4o_image_function]
)
# Modify the orchestration agent
@fn
def orchestration_agent(task: dict, context: dict) -> str:
print("\n>>>>> ORCHESTRATION START <<<<<")
print(f"TASK RECEIVED: {task}")
print(f"CONTEXT RECEIVED: {context}")
instructions = task.get("instructions") if isinstance(task, dict) else str(task)
# Check for image task
is_image_task = any(word in instructions.lower() for word in [
"image", "picture", "photo", "ocr", "extract from image", "numbers in the image"
])
if is_image_task:
print("\n>>>>> ROUTING TO IMAGE AGENT")
# Use the image agent instead of the raw function
result = marvin.run(
instructions,
agents=[gpt_image_agent], # Use the agent, not the function
context={
"image_path": context.get("fileinfo"),
"Q": instructions
}
)
return result
# ... rest of the orchestration logic ...
# Main workflow
if __name__ == "__main__":
with marvin.Thread() as thread:
question = dataset[20]['Question']
fileinfo = dataset[20]['file_path']
print(f"\n>>>>> PROCESSING:")
print(f"QUESTION: {question}")
print(f"FILE PATH: {fileinfo}")
plan = marvin.plan(question)
outputs = []
for task in plan:
print(f"\n>>>>> EXECUTING TASK: {task.instructions}")
res = marvin.run(
task.instructions,
agents=[orchestration_agent],
context={
"question": question,
"fileinfo": fileinfo,
"task": task
}
)
outputs.append(res)
Key changes:
1. Made sure the GPT-4 Vision function is properly wrapped in an agent
2. Modified the orchestration agent to use the image agent instead of trying to use the function directly
3. Ensured proper context passing for image processing
4. Added more specific image-related keywords for task detection
The error you're seeing suggests that:
1. Either the function is being called directly instead of through the agentMarvin
07/09/2025, 7:43 PMSanaz
07/09/2025, 7:43 PMMarvin
07/09/2025, 7:43 PMMarvin
07/09/2025, 7:44 PMpython
orchestration_agent = marvin.Agent(
name="orchestrator",
tools=[get_image_description_gpt4o] # Direct function as tool
)
vs.
2. Using a specialized agent with tools:
python
gpt_image_agent = marvin.Agent(
name="gpt-image",
tools=[gpt_4o_image_function]
)
orchestration_agent = marvin.Agent(
name="orchestrator",
tools=[...] # Other tools
)
The output you got shows exactly how Marvin handles direct tool calls:
Agent "orchestrator" (1396b4fc)
Tool: get_image_description_gpt4o
Status: :white_check_mark:
Input {
'image_path': 'C:\\Users\\a\\.cache\\huggingface\\hub\\datasets--gaia-benchmark--GAIA\\snaps…
'Q': 'Identify the red and green numbers in the image.'
}
This demonstrates that:
1. The tool is called directly with the provided parameters
2. There's no agent-level instruction processing
3. The function receives raw inputs without any agent-level context or processing
This is useful for understanding how Marvin's tool execution works at a basic level. For production use, you'd typically want to use the agent-based approach as it provides:
- Better context handling
- More sophisticated instruction processing
- Proper error handling
- Chain of thought reasoning
Would you like to experiment with any other aspects of tool vs. agent behavior?Sanaz
07/15/2025, 2:49 PMMarvin
07/15/2025, 2:49 PMMarvin
07/15/2025, 2:50 PMmarvin.plan()
works:
1. Core Functionality
- marvin.plan()
is a function designed to break down complex objectives into smaller, manageable tasks
- It returns a list of Task
objects that represent discrete steps needed to achieve the main objective
2. Key Parameters
- `instructions`: The main objective or goal you want to break down
- `agent`: Optional custom agent to handle the planning (if not specified, uses default)
- `thread`: Optional conversation thread for context
- `context`: Additional contextual information as a dictionary
- `available_agents`: List of agents that can be used for task execution
- `tools`: List of functions that can be used during task execution
- `parent_task`: Optional parent task if this is part of a larger task hierarchy
3. Planning Process
The function uses a prompt-based approach to break down tasks. The planner is instructed to:
- Create tasks that represent discrete, observable goals
- Avoid tasks that are too small (which could create unnecessary overhead)
- Avoid tasks that are too large (which would be difficult to manage)
- Ensure each task is clearly defined and actionable
4. Usage Example
python
from marvin import plan
tasks = plan(
instructions="Build a data pipeline that processes customer data",
context={"data_source": "CSV files", "output_format": "PostgreSQL"}
)
# Each returned task will have properties like:
# - description
# - status
# - dependencies
# - etc.
5. Async Support
- The function also supports async operations for use in asynchronous workflows
You can find more detailed documentation about marvin.plan()
at:
- https://github.com/PrefectHQ/marvin/tree/main/docs/functions/plan.mdx
- https://askmarvin.ai/api-reference/marvin-fns-plan
Let me know if you'd like to see specific examples of how to use marvin.plan()
in different scenarios or if you have any other questions!Sanaz
07/15/2025, 4:12 PMMarvin
07/15/2025, 4:12 PMMarvin
07/15/2025, 4:13 PMpython
import marvin
# Change the default model for all agents
marvin.defaults.model = "openai:gpt-4" # or any other supported model
2. Per-Agent Configuration
python
from marvin import Agent
# Create an agent with a specific model
agent = Agent(
model="openai:gpt-4", # Specify the model
model_settings={
"temperature": 0.7, # Optional: Adjust model parameters
"max_tokens": 1000
}
)
3. Available Model Formats
The model string should be in the format: provider:model_name
Examples:
python
# OpenAI models
"openai:gpt-4"
"openai:gpt-3.5-turbo"
4. Setting OpenAI API Key
Before using any OpenAI models, make sure to set your API key:
bash
export OPENAI_API_KEY="your-api-key"
Or in Python:
python
import os
os.environ["OPENAI_API_KEY"] = "your-api-key"
5. Model Settings
You can customize various model parameters:
python
model_settings = {
"temperature": 0.7, # Controls randomness (0.0 to 1.0)
"max_tokens": 1000, # Maximum tokens in response
"top_p": 1.0, # Nuclear sampling parameter
"frequency_penalty": 0, # Penalty for frequent token use
"presence_penalty": 0 # Penalty for new token introduction
}
agent = Agent(model="openai:gpt-4", model_settings=model_settings)
Key Points to Remember:
- The default model is OpenAI's GPT-4
- Changes to marvin.defaults.model
affect all agents created afterward
- Individual agents can override the default settings
- Always ensure you have the appropriate API keys set up for the model provider you're using
You can find more detailed documentation about LLM configuration at:
- https://github.com/PrefectHQ/marvin/tree/main/docs/guides/configure-llms.mdx
- https://askmarvin.ai/api-reference/marvin-utilities-models
Let me know if you need any clarification or have questions about specific model configurations!Sanaz
07/17/2025, 4:37 PMMarvin
07/17/2025, 4:37 PMMarvin
07/17/2025, 4:38 PMbash
pip install marvin
Using uv (a faster alternative to pip):
bash
# Install uv first if you haven't already
curl -LsSf <https://astral.sh/uv/install.sh> | sh
# Then install Marvin
uv pip install marvin
3. Set up API Key
After installation, set up your OpenAI API key (required for using the default LLM):
bash
export OPENAI_API_KEY="your-api-key"
4. Verify Installation
You can verify the installation by running Python and importing Marvin:
python
import marvin
5. Optional: Install in a Virtual Environment
It's recommended to install Marvin in a virtual environment:
bash
# Create a virtual environment
python -m venv venv
# Activate it (on Unix/macOS)
source venv/bin/activate
# Activate it (on Windows)
.\venv\Scripts\activate
# Install Marvin
pip install marvin
6. Additional Installation Options
- To install with specific extras or development dependencies:
bash
pip install "marvin[dev]" # Install with development dependencies
After installation, you can start using Marvin in your Python environment. Here's a quick test to ensure everything is working:
python
from marvin import ai_fn
@ai_fn
def greet(name: str) -> str:
"""Generate a friendly greeting."""
pass
# Test the function
print(greet("World"))
If you encounter any issues during installation:
1. Make sure you're using Python 3.10 or higher
2. Verify your OpenAI API key is correctly set
3. Check that all dependencies were installed correctly
You can find more detailed installation instructions in the documentation at:
https://github.com/PrefectHQ/marvin/tree/main/docs/installation.mdx
Let me know if you need help with any specific installation scenarios or encounter any issues!Sanaz
07/22/2025, 2:59 PMMarvin
07/22/2025, 3:00 PMMarvin
07/22/2025, 3:00 PMSanaz
07/22/2025, 4:00 PMMarvin
07/22/2025, 4:00 PMMarvin
07/22/2025, 4:01 PMpython
from marvin import Agent, Memory
# Create a memory instance
memory = Memory()
# Create an agent with memory
agent = Agent(
name="MyAgent",
memories=[memory]
)
2. Storing Information:
python
# Store information in memory
await memory.store("The user prefers Python over JavaScript")
# Store multiple facts
await memory.store([
"The user is working on a data pipeline",
"The user's name is Alice"
])
3. Retrieving Information:
python
# Retrieve relevant memories
relevant_memories = await memory.recall("What does the user prefer?")
4. Using Memory in Conversations:
python
from marvin import Agent, Memory, assistant
@assistant()
async def coding_assistant(message: str, memory: Memory):
# Recall relevant information before responding
context = await memory.recall(message)
# Use the context in your response
response = f"Based on what I remember: {context}\n"
response += "Here's my response to your question..."
# Store new information from the conversation
await memory.store(f"User asked about: {message}")
return response
Key Features:
- Vector Database Backend: Marvin uses a vector database for efficient storage and retrieval
- Semantic Search: Memory recall is based on semantic similarity, not just exact matches
- Persistence: Memories persist across different sessions
- Context-Aware: Can retrieve relevant information based on the current conversation
Best Practices:
1. Store specific, actionable information rather than entire conversations
2. Use clear, consistent formatting for stored memories
3. Consider implementing memory cleanup strategies for older or less relevant information
4. Use memory.recall() with specific queries to get the most relevant information
Would you like me to provide more specific examples for any of these aspects, or do you have a particular use case in mind for the memory system?Sanaz
07/22/2025, 4:48 PMMarvin
07/22/2025, 4:48 PMMarvin
07/22/2025, 4:49 PMBaseTool
from langchain_core.tools
, integrating it directly with Marvin agents isn't the recommended approach. Instead, Marvin has its own tools system that's designed to work with its agent architecture.
Here's what you should consider:
1. Marvin's Native Tools Approach:
python
from marvin import tool
@tool
async def my_custom_tool(input_parameter: str) -> str:
"""
Description of what this tool does
"""
# Your tool implementation here
return result
2. If you need LangChain functionality, you have two main options:
a. Wrap LangChain tools in Marvin tools:
python
from marvin import tool
from langchain_core.tools import BaseTool
# Wrap your LangChain tool functionality in a Marvin tool
@tool
async def langchain_wrapper(input_parameter: str) -> str:
"""
Description of what this wrapped tool does
"""
# Implement your LangChain tool logic here
# but wrap it in Marvin's tool decorator
return result
b. Use Marvin's native capabilities that might provide similar functionality to what you're trying to achieve with LangChain tools.
Best Practices:
1. Start with Marvin's native tools system first
2. Only integrate LangChain tools if there's specific functionality you need that isn't available in Marvin
3. When wrapping LangChain tools, make sure to handle async/sync compatibility properly
4. Document clearly when you're using wrapped external tools
Would you like to:
1. Share what specific LangChain tool functionality you're trying to use? This would help me suggest the best Marvin-native alternative.
2. See an example of how to implement a specific type of tool in Marvin?
3. Learn more about Marvin's native tools system?Sanaz
07/29/2025, 6:49 PMMarvin
07/29/2025, 6:49 PMMarvin
07/29/2025, 6:50 PMpython
from marvin import tool
@tool
def calculate_metrics(data: list[float], threshold: float = 0.5) -> dict:
"""
Calculate metrics for a list of numbers based on a threshold.
Args:
data: List of numerical values to analyze
threshold: Cutoff value for analysis (default: 0.5)
Returns:
Dictionary containing calculated metrics
"""
# Function implementation
pass
2. How Marvin Understands Arguments
Marvin uses several pieces of information to understand how to pass arguments to your tools:
- Type Hints: The type annotations (like list[float]
, float
) help Marvin understand what kind of data to pass
- Docstrings: The documentation helps Marvin understand the purpose and expected format of each argument
- Function Signature: Marvin can inspect the function to see required vs optional arguments
3. Agent Usage
When an agent needs to use a tool, it:
- Analyzes the tool's requirements based on the function signature
- Determines what arguments are needed
- Formats the available data to match the expected types
- Calls the function with appropriate arguments
Here's an example of how to set this up:
python
from marvin import Agent
from prefect import flow
# Define your tools
@tool
def process_data(input_text: str, max_length: int = 100) -> str:
"""
Process input text with a maximum length constraint.
Args:
input_text: The text to process
max_length: Maximum allowed length
"""
return input_text[:max_length]
# Create an agent with tools
agent = Agent(
tools=[process_data],
instructions="You are an agent that processes text data."
)
@flow
def process_text_flow(text: str):
# The agent will automatically understand how to use the tool
# and pass the appropriate arguments
result = agent.run(f"Process this text: {text}")
return result
4. Best Practices for Tool Arguments:
- Always use type hints for your arguments
- Provide clear, detailed docstrings
- Use descriptive parameter names
- Set sensible defaults for optional arguments
- Keep the function signature simple and focused
5. Troubleshooting Tips:
If your agent isn't passing arguments correctly:
- Check that your type hints are correct
- Verify your docstrings are clear and detailed
- Ensure the agent has the proper context to understand when and how to use the tool
- Consider adding example usage in the docstring
Remember that Marvin's ability to understand and use your tools depends heavily on how well they are documented and typed. The more information you provide through type hints and docstrings, the better Marvin can understand how to use the tools correctly.
Would you like me to see any specific examples of tool implementations you're working with? That would help me provide more targeted advice for your use case.Sanaz
07/29/2025, 7:12 PMMarvin
07/29/2025, 7:12 PMMarvin
07/29/2025, 7:13 PMpython
from marvin import Agent
# Using OpenAI's GPT-4
agent = Agent(
model="openai:gpt-4",
instructions="Your agent instructions here"
)
# Using GPT-3.5-turbo
agent = Agent(
model="openai:gpt-3.5-turbo",
instructions="Your agent instructions here"
)
# Using Anthropic's Claude
agent = Agent(
model="anthropic:claude-2",
instructions="Your agent instructions here"
)
2. With Model Settings
You can also specify model settings along with the model:
python
agent = Agent(
model="openai:gpt-4",
model_settings={
"temperature": 0.7,
"max_tokens": 500,
"top_p": 1.0
},
instructions="Your agent instructions here"
)
3. Change Default Model Globally
You can set the default model for all agents in your application:
python
import marvin
# Set default model for all agents
marvin.settings.model = "openai:gpt-4"
4. Supported Model Providers
Marvin supports several LLM providers through Pydantic AI:
- OpenAI (openai:
)
- gpt-4
- gpt-3.5-turbo
- Anthropic (anthropic:
)
- claude-2
- claude-instant
- Cohere (cohere:
)
- Google (google:
)
- Bedrock (bedrock:
)
- Groq (groq:
)
- Mistral (mistral:
)
5. Environment Configuration
Remember to set up your API keys in environment variables for the provider you're using:
bash
# For OpenAI
export OPENAI_API_KEY=your_api_key
# For Anthropic
export ANTHROPIC_API_KEY=your_api_key
# For other providers, check their specific environment variable requirements
6. Advanced Configuration
For more advanced configurations, you can use the provider-specific model classes:
python
from pydantic_ai.models import OpenAIModel
custom_model = OpenAIModel(
model="gpt-4",
temperature=0.7,
max_tokens=500
)
agent = Agent(
model=custom_model,
instructions="Your agent instructions here"
)
Important Notes:
- Make sure you have the appropriate API keys set up for the model provider you choose
- Different models may have different capabilities and costs
- Some models might require additional configuration or dependencies
- Performance and response quality can vary between models
Would you like me to provide more specific examples for any particular model provider or configuration scenario you're interested in?Sanaz
07/29/2025, 7:22 PMMarvin
07/29/2025, 7:22 PMMarvin
07/29/2025, 7:22 PMpython
import os
from marvin import Agent
# Set the environment variables
os.environ["OPENAI_API_BASE"] = "<http://127.0.0.1:your_port>" # Your base URL
os.environ["OPENAI_API_KEY"] = "SS" # Your API key
# Create the agent (it will automatically use the environment variables)
agent = Agent(
model="openai:your-model-name", # Replace with your model name
instructions="Your instructions here"
)
2. Using Custom OpenAI Configuration:
python
from marvin import Agent
from pydantic_ai.models import OpenAIModel
from pydantic_ai.providers import OpenAIProvider
import openai
# Create custom OpenAI client
client = openai.Client(
base_url="<http://127.0.0.1:your_port>",
api_key="SS"
)
# Create custom OpenAI provider
provider = OpenAIProvider(openai_client=client)
# Create custom model configuration
model = OpenAIModel(
model="your-model-name", # Replace with your model name
provider=provider
)
# Create the agent with the custom model
agent = Agent(
model=model,
instructions="Your instructions here"
)
3. Alternative Configuration Using Settings:
python
import marvin
from marvin import Agent
# Configure Marvin settings
marvin.settings.openai.api_base = "<http://127.0.0.1:your_port>"
marvin.settings.openai.api_key = "SS"
# Create the agent
agent = Agent(
model="openai:your-model-name", # Replace with your model name
instructions="Your instructions here"
)
Important Notes:
- Replace your_port
with the actual port number from your base URL
- Replace your-model-name
with the actual model name your endpoint is serving
- Make sure your custom endpoint is fully compatible with the OpenAI API format
- If your endpoint requires any additional headers or authentication, you might need to configure those as well
To test if it's working:
python
# Test the agent
result = agent.run("Hello, are you connected to the correct model?")
print(result)
If you encounter any issues, you might need to:
1. Verify the endpoint is accessible
2. Check if your model name matches exactly what the endpoint expects
3. Ensure the API key format is correct
4. Verify any additional headers or authentication requirements
Would you like me to provide more specific guidance based on your particular setup or are you encountering any specific errors?Sanaz
07/29/2025, 7:34 PMMarvin
07/29/2025, 7:34 PMMarvin
07/29/2025, 7:35 PMmarvin.plan()
, you can change the model in several ways:
1. Using Environment Variables (Recommended):
python
import os
import marvin
# Set environment variables before using plan()
os.environ["MARVIN_AGENT_MODEL"] = "openai:gpt-4" # or your desired model
os.environ["OPENAI_API_KEY"] = "your-api-key"
# Now use plan()
result = marvin.plan("Your planning prompt here")
2. Using Marvin Settings:
python
import marvin
# Configure settings before using plan()
marvin.settings.model = "openai:gpt-3.5-turbo"
marvin.settings.model_settings = {
"temperature": 0.7,
"max_tokens": 500
}
result = marvin.plan("Your planning prompt here")
3. For Custom OpenAI-compatible Endpoints (like your case):
python
import marvin
import os
# Set custom endpoint
os.environ["OPENAI_API_BASE"] = "<http://127.0.0.1:your_port>"
os.environ["OPENAI_API_KEY"] = "SS"
os.environ["MARVIN_AGENT_MODEL"] = "your-model-name"
# Use plan()
result = marvin.plan("Your planning prompt here")
4. Using a Configuration File:
Create a .env
file in your project root:
env
MARVIN_AGENT_MODEL=your-model-name
OPENAI_API_BASE=<http://127.0.0.1:your_port>
OPENAI_API_KEY=SS
Then in your code:
python
from dotenv import load_dotenv
import marvin
load_dotenv()
result = marvin.plan("Your planning prompt here")
5. Advanced Configuration with Custom Provider:
python
import marvin
from pydantic_ai.models import OpenAIModel
from pydantic_ai.providers import OpenAIProvider
import openai
# Create custom client
client = openai.Client(
base_url="<http://127.0.0.1:your_port>",
api_key="SS"
)
# Configure provider and model
provider = OpenAIProvider(openai_client=client)
model = OpenAIModel(
model="your-model-name",
provider=provider
)
# Set as default model
marvin.settings.model = model
# Use plan()
result = marvin.plan("Your planning prompt here")
Important Notes:
- Make sure to set up your configuration before calling marvin.plan()
- The model name should match exactly what your endpoint expects
- If you're using a custom endpoint, ensure it's fully compatible with the OpenAI API format
- You might need to adjust the temperature and other settings based on your specific needs
To verify the configuration:
python
# Test the configuration
print(f"Current model: {marvin.settings.model}")
print(f"Current model settings: {marvin.settings.model_settings}")
Would you like me to provide more specific guidance based on your particular setup or are you encountering any specific issues?Sanaz
07/30/2025, 2:38 AMMarvin
07/30/2025, 2:38 AMMarvin
07/30/2025, 2:40 AMmarvin.settings.model = model
is incorrect. For marvin.plan()
, the primary way to configure a custom endpoint is through environment variables. Here's the correct approach:
1. Using Environment Variables (Most Reliable Method):
python
import os
import marvin
# Set these before using marvin.plan()
os.environ["OPENAI_API_BASE"] = "<http://127.0.0.1:your_port>"
os.environ["OPENAI_API_KEY"] = "SS"
# Use plan()
result = marvin.plan("Your planning prompt here")
2. Using a .env file:
Create a .env
file:
env
OPENAI_API_BASE=<http://127.0.0.1:your_port>
OPENAI_API_KEY=SS
Then in your code:
python
from dotenv import load_dotenv
import marvin
load_dotenv()
result = marvin.plan("Your planning prompt here")
If you're encountering any specific errors or issues with these approaches, please let me know and I can help troubleshoot. Also, if you're using a specific model name from your custom endpoint, you might need to specify that as well through the appropriate environment variable.
Would you like me to provide more specific guidance based on your particular setup or any errors you're seeing?Sanaz
07/30/2025, 7:47 PMMarvin
07/30/2025, 7:47 PMMarvin
07/30/2025, 7:47 PMpython
import os
# Make sure the URL format is exactly correct - common issues include:
# - Missing http:// or https://
# - Wrong port number
# - Missing or extra slashes
os.environ["OPENAI_API_BASE"] = "<http://127.0.0.1:1234>" # adjust port number
os.environ["OPENAI_API_KEY"] = "SS"
2. Verify the endpoint is running
Before using Marvin, try testing the connection directly with the OpenAI client:
python
import openai
client = openai.Client(
base_url="<http://127.0.0.1:1234>", # your actual port
api_key="SS"
)
# Test the connection
try:
# Basic test call
response = client.chat.completions.create(
model="your-model-name",
messages=[{"role": "user", "content": "Hello"}]
)
print("Connection successful!")
except Exception as e:
print(f"Connection failed: {str(e)}")
3. Check for SSL/HTTPS issues
If you're using a self-hosted endpoint without SSL, you might need to disable SSL verification:
python
import os
os.environ["OPENAI_API_BASE"] = "<http://127.0.0.1:1234>"
os.environ["OPENAI_API_KEY"] = "SS"
# If needed for non-SSL endpoints
os.environ["OPENAI_VERIFY_SSL"] = "false"
Could you please provide:
1. The exact error message you're getting?
2. Can you confirm if the endpoint is accessible directly using the OpenAI client (without Marvin)?
3. What is the exact URL format you're using? (you can obscure sensitive parts)
This information would help me provide a more targeted solution for your specific setup.Sanaz
07/30/2025, 7:49 PMMarvin
07/30/2025, 7:49 PMMarvin
07/30/2025, 7:50 PMpython
import marvin
from typing import Dict, List
import time
import json
from datetime import datetime
class AgentEvaluator:
def __init__(self, test_cases: List[dict], models: List[str]):
self.test_cases = test_cases
self.models = models
self.results = {}
def run_evaluation(self):
for model in self.models:
self.results[model] = self._evaluate_model(model)
def _evaluate_model(self, model_identifier: str) -> Dict:
results = []
# Configure environment for the model
self._setup_model_env(model_identifier)
for test_case in self.test_cases:
try:
start_time = time.time()
# Create agent with the specific model
agent = marvin.Agent(
model=model_identifier,
instructions=test_case.get('instructions', "You are a helpful AI assistant.")
)
# Run the test
response = agent.run(test_case['input'])
end_time = time.time()
result = {
'test_case_id': test_case['id'],
'input': test_case['input'],
'output': response,
'execution_time': end_time - start_time,
'success': True,
'error': None
}
except Exception as e:
result = {
'test_case_id': test_case['id'],
'input': test_case['input'],
'output': None,
'execution_time': None,
'success': False,
'error': str(e)
}
results.append(result)
return results
def _setup_model_env(self, model_identifier):
# Configure environment variables based on model
if model_identifier.startswith('openai:'):
import os
os.environ["OPENAI_API_KEY"] = "your-key"
# Add other model configurations as needed
def save_results(self, filename=None):
if filename is None:
filename = f"agent_evaluation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
def print_summary(self):
for model, results in self.results.items():
success_rate = len([r for r in results if r['success']]) / len(results)
avg_time = sum(r['execution_time'] for r in results if r['success']) / len(results)
print(f"\nModel: {model}")
print(f"Success Rate: {success_rate*100:.2f}%")
print(f"Average Execution Time: {avg_time:.2f} seconds")
2. Define Test Cases
python
test_cases = [
{
'id': 1,
'input': "Analyze this data and create a summary report",
'instructions': "You are a data analysis expert.",
'expected_output_type': 'report',
},
{
'id': 2,
'input': "Create a marketing strategy for a new product",
'instructions': "You are a marketing strategist.",
'expected_output_type': 'strategy',
},
# Add more test cases
]
# Define models to test
models_to_test = [
"openai:gpt-4",
"openai:gpt-3.5-turbo",
"anthropic:claude-2",
# Add your custom endpoint
"your-custom-model" # Configure this appropriately
]
3. Run the Evaluation
```python
# Initialize and run evaluator
evaluator = AgentEvaluator(test_cases, models_to_test)
evaluator.run_evaluation()
# Save results
evaluator.save_results()Marvin
07/30/2025, 7:50 PM4. *Add Custom Metrics*
```python
from typing import Callable
class MetricsCalculator:
@staticmethod
def response_length(response: str) -> int:
return len(response.split())
@staticmethod
def contains_key_elements(response: str, required_elements: List[str]) -> float:
found = sum(1 for elem in required_elements if elem.lower() in response.lower())
return found / len(required_elements)
class EnhancedAgentEvaluator(AgentEvaluator):
def __init__(self, test_cases: List[dict], models: List[str], metrics: Dict[str, Callable] = None):
super().__init__(test_cases, models)
self.metrics = metrics or {
'response_length': MetricsCalculator.response_length,
}
def _evaluate_model(self, model_identifier: str) -> Dict:
results = super()._evaluate_model(model_identifier)
# Add metrics calculations
for result in results:
if result['success']:
result['metrics'] = {
metric_name: metric_func(result['output'])
for metric_name, metric_func in self.metrics.items()
}
return results
5. Use the Enhanced Evaluator
python
# Define custom metrics
custom_metrics = {
'response_length': MetricsCalculator.response_length,
'completeness': lambda x: MetricsCalculator.contains_key_elements(
x, ['analysis', 'recommendation', 'conclusion']
)
}
# Initialize enhanced evaluator
evaluator = EnhancedAgentEvaluator(
test_cases=test_cases,
models=models_to_test,
metrics=custom_metrics
)
# Run evaluation
evaluator.run_evaluation()
6. For Your Custom Endpoint
python
import os
# Configure your custom endpoint before running evaluation
os.environ["OPENAI_API_BASE"] = "<http://127.0.0.1:your_port>"
os.environ["OPENAI_API_KEY"] = "SS"
# Add your custom model to the test
models_to_test = [
"your-model-name", # Your custom model
"openai:gpt-4", # For comparison
# Add other models as needed
]
This framework allows you to:
- Test multiple models systematically
- Collect performance metrics
- Compare results across different models
- Save results for later analysis
- Add custom metrics for evaluation
Would you like me to explain any part in more detail or help you set up specific test cases for your agentic system?Sanaz
07/30/2025, 8:25 PMMarvin
07/30/2025, 8:25 PMMarvin
07/30/2025, 8:26 PMmarvin.plan()
, we need to configure it differently since it doesn't accept the model configuration in the same way as agents. Here's how to modify your code to work with a custom endpoint:
python
import os
import marvin
from openai import AsyncOpenAI
import asyncio
# Method 1: Using environment variables (Most reliable)
os.environ["OPENAI_API_BASE"] = "<http://your-endpoint>" # Your actual endpoint
os.environ["OPENAI_API_KEY"] = "EMPTY"
# Optional: If your endpoint uses a specific model name
os.environ["MARVIN_MODEL_NAME"] = "default" # Your model name
async def main():
try:
steps = await marvin.plan("Design a launch strategy for a mobile app.")
print(steps)
except Exception as e:
print(f"Error occurred: {str(e)}")
# Print more details about the error
print(f"Error type: {type(e)}")
asyncio.run(main())
If you're still getting connection errors, you can try this alternative approach with more detailed error handling:
python
import os
import marvin
from openai import AsyncOpenAI
import asyncio
import httpx
# Configure the base settings
os.environ["OPENAI_API_BASE"] = "<http://your-endpoint>"
os.environ["OPENAI_API_KEY"] = "EMPTY"
# Optional: Disable SSL verification if needed
os.environ["OPENAI_VERIFY_SSL"] = "false"
async def test_connection():
"""Test the connection to the endpoint"""
client = AsyncOpenAI(
base_url=os.environ["OPENAI_API_BASE"],
api_key=os.environ["OPENAI_API_KEY"],
timeout=httpx.Timeout(30.0) # Increase timeout if needed
)
try:
# Test the connection with a simple completion
response = await client.chat.completions.create(
model="default", # Your model name
messages=[{"role": "user", "content": "Hello"}]
)
print("Connection test successful!")
return True
except Exception as e:
print(f"Connection test failed: {str(e)}")
return False
async def main():
# First test the connection
connection_ok = await test_connection()
if connection_ok:
try:
steps = await marvin.plan("Design a launch strategy for a mobile app.")
print(steps)
except Exception as e:
print(f"Error in marvin.plan(): {str(e)}")
else:
print("Skipping marvin.plan() due to connection test failure")
if __name__ == "__main__":
asyncio.run(main())
If you're still experiencing issues, could you please:
1. Share the exact error message you're getting
2. Confirm if the test_connection() function works
3. Share your endpoint's URL format (you can obscure sensitive parts)
Also, make sure:
1. Your endpoint is running and accessible
2. The URL format is correct (including protocol http:// or https://)
3. The port number is correct
4. Your endpoint is compatible with the OpenAI API format
Would you like me to provide any additional troubleshooting steps based on specific errors you're seeing?Sanaz
07/30/2025, 8:29 PMMarvin
07/30/2025, 8:29 PMMarvin
07/30/2025, 8:30 PMmarvin.plan()
is still failing. This suggests that Marvin might be using a different configuration path for its calls. Let's try a more comprehensive solution:
python
import os
import marvin
from openai import AsyncOpenAI
import asyncio
import httpx
from marvin import settings
# More comprehensive environment setup
os.environ.update({
"OPENAI_API_BASE": "<http://your-endpoint>",
"OPENAI_API_KEY": "EMPTY",
"MARVIN_OPENAI_API_BASE": "<http://your-endpoint>", # Try adding this
"MARVIN_OPENAI_API_KEY": "EMPTY", # And this
"OPENAI_VERIFY_SSL": "false", # If needed
})
# Try to directly configure Marvin's settings
settings.openai.api_base = "<http://your-endpoint>"
settings.openai.api_key = "EMPTY"
async def test_connection():
"""Test the connection to the endpoint"""
client = AsyncOpenAI(
base_url=os.environ["OPENAI_API_BASE"],
api_key=os.environ["OPENAI_API_KEY"],
timeout=httpx.Timeout(30.0),
http_client=httpx.AsyncClient(verify=False) # If needed for SSL
)
try:
response = await client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": "Hello"}]
)
print("Connection test successful!")
print(f"Model responded with: {response.choices[0].message.content}")
return True
except Exception as e:
print(f"Connection test failed: {str(e)}")
return False
async def custom_plan():
"""Alternative to marvin.plan() using direct API calls"""
client = AsyncOpenAI(
base_url=os.environ["OPENAI_API_BASE"],
api_key=os.environ["OPENAI_API_KEY"],
timeout=httpx.Timeout(30.0),
http_client=httpx.AsyncClient(verify=False) # If needed for SSL
)
messages = [
{"role": "system", "content": "You are a helpful AI assistant that creates detailed plans."},
{"role": "user", "content": "Design a launch strategy for a mobile app."}
]
try:
response = await client.chat.completions.create(
model="default",
messages=messages,
temperature=0.7,
)
return response.choices[0].message.content
except Exception as e:
print(f"Custom plan failed: {str(e)}")
return None
async def main():
# First test the connection
connection_ok = await test_connection()
if connection_ok:
print("\nTrying marvin.plan()...")
try:
steps = await marvin.plan("Design a launch strategy for a mobile app.")
print("Marvin plan succeeded:")
print(steps)
except Exception as e:
print(f"Error in marvin.plan(): {str(e)}")
print("\nTrying custom plan implementation...")
custom_steps = await custom_plan()
if custom_steps:
print("Custom plan succeeded:")
print(custom_steps)
if __name__ == "__main__":
asyncio.run(main())
If you're still getting the connection error with marvin.plan()
, we can try to debug it further:
1. Add debug logging:
python
import logging
# Add this before running the main code
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("marvin")
logger.setLevel(logging.DEBUG)
2. Or create a custom implementation of the planning functionality:
```python
async def alternative_plan(prompt: str):
"""Alternative implementation of marvin.plan()"""
client = AsyncOpenAI(
base_url=os.environ["OPENAI_API_BASE"],
api_key=os.environ["OPENAI_API_KEY"],
timeout=httpx.Timeout(30.0)
)
system_prompt = """
You are a planning assistant. When given a task:
1. Break it down into clear, sequential steps
2. Provide specific, actionable items for each step
3. Include relevant details and considerationsMarvin
07/30/2025, 8:30 PMmarvin.plan()
2. If the custom implementation works
3. What the debug logs show (if you enable them)
This will help us determine whether the issue is specific to marvin.plan()
or if we need to implement a custom solution using the direct API calls that we know work.Sanaz
07/31/2025, 5:47 PMMarvin
07/31/2025, 5:47 PMMarvin
07/31/2025, 5:48 PMmarvin.plan()
. Let's fix this by properly configuring the agent and the planning structure. Here's the corrected approach:
python
import marvin
from marvin import Agent
import asyncio
# Create a planning-specific agent
planagent = Agent(
instructions="""You are an expert planning assistant. When creating plans:
1. Always start with a root task (task 0)
2. Break down complex tasks into clear subtasks
3. Ensure each subtask has a clear parent task
4. Provide specific, actionable items
""",
model="your-model-name" # Your specific model
)
# Configure the environment
import os
os.environ["OPENAI_API_BASE"] = "your-endpoint"
os.environ["OPENAI_API_KEY"] = "your-key"
async def create_plan(question: str):
try:
# Create initial root task
root_task = {
"id": 0,
"description": "Main objective",
"parent_id": None
}
# Use marvin.plan() with the configured agent
plan = await marvin.plan(
instructions=question,
agent=planagent,
initial_tasks=[root_task] # Provide the root task
)
return plan
except Exception as e:
print(f"Error creating plan: {str(e)}")
return None
# Run the planning
async def main():
question = "Design a launch strategy for a mobile app."
try:
result = await create_plan(question)
if result:
print("Plan created successfully:")
print(result)
except Exception as e:
print(f"Error in main: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())
If you're still experiencing issues, here's an alternative approach using a custom planning implementation:
python
import marvin
from marvin import Agent
import asyncio
class PlanningAgent(Agent):
def __init__(self, model_name: str):
super().__init__(
instructions="""You are an expert planning assistant. Create detailed,
hierarchical plans with clear parent-child relationships between tasks.""",
model=model_name
)
async def create_hierarchical_plan(self, question: str):
# Create a planning prompt that ensures proper task hierarchy
planning_prompt = f"""
Create a detailed plan for: {question}
Format your response as a hierarchical list where:
- Each task has a unique ID
- Each subtask references its parent task ID
- Start with task ID 0 as the main objective
Example format:
0. Main objective
1. First major task (parent: 0)
1.1 Subtask (parent: 1)
1.2 Subtask (parent: 1)
2. Second major task (parent: 0)
2.1 Subtask (parent: 2)
"""
try:
response = await self.run(planning_prompt)
return response
except Exception as e:
print(f"Error in planning: {str(e)}")
return None
async def main():
# Configure the environment
os.environ["OPENAI_API_BASE"] = "your-endpoint"
os.environ["OPENAI_API_KEY"] = "your-key"
# Create the planning agent
planner = PlanningAgent(model_name="your-model-name")
# Test the planning
question = "Design a launch strategy for a mobile app."
result = await planner.create_hierarchical_plan(question)
if result:
print("Generated Plan:")
print(result)
if __name__ == "__main__":
asyncio.run(main())
And if you need even more control over the planning process, here's a structured planning implementation:
```python
from dataclasses import dataclass
from typing import List, Optional
import marvin
from marvin import Agent
@dataclass
class PlanTask:
id: int
description: str
parent_id: Optional[int] = None
subtasks: List['PlanTask'] = NoneMarvin
07/31/2025, 5:48 PMSanaz
07/31/2025, 6:04 PMMarvin
07/31/2025, 6:04 PMMarvin
07/31/2025, 6:04 PMmarvin.plan()
with a custom model, we need to properly structure the planning process and handle the task hierarchy. Here's how to modify your code:
python
import marvin
from marvin import Agent
from openai import AsyncOpenAI
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
import asyncio
# Your client setup
client = AsyncOpenAI(
base_url="your-endpoint",
api_key="your-key"
)
# Create the planning agent with structured task handling
planagent = marvin.Agent(
model=OpenAIModel(
model_name="qwen2.5vl",
provider=OpenAIProvider(openai_client=client),
),
name="Planner",
instructions="""You are a planning assistant that creates structured task lists.
When creating plans:
1. Always start with a root task (task 0)
2. Each subsequent task must have a clear parent task
3. Break down complex tasks into manageable subtasks
4. Provide specific, actionable instructions
Format your responses as:
0. Root Task: [Main Objective]
1. Task: [Description] (Parent: 0)
1.1 Subtask: [Description] (Parent: 1)
1.2 Subtask: [Description] (Parent: 1)
2. Task: [Description] (Parent: 0)
""",
)
async def create_structured_plan(question: str, file_info: str):
# Create the initial root task
root_task = {
"id": 0,
"description": f"Plan for: {question}",
"parent_id": None
}
try:
# Combine question and file info
full_prompt = f"{question}\nContext: {file_info}"
# Create the plan with explicit root task
plan = await marvin.plan(
instructions=full_prompt,
agent=planagent,
initial_tasks=[root_task]
)
return plan
except Exception as e:
print(f"Error creating plan: {str(e)}")
return None
if __name__ == "__main__":
all_logs = []
async def process_question():
with marvin.Thread() as thread:
index = 160 # Change this to loop if needed
print(dataset[index])
question = dataset[index]["Question"]
fileinfo = dataset[index]["file_path"]
finalanswer = dataset[index]["Final answer"]
try:
# Create the plan
plan = await create_structured_plan(question, fileinfo)
if plan:
print("\nGenerated Plan:")
for i, task in enumerate(plan):
print(f"{i}. {task.instructions}")
if hasattr(task, 'subtasks') and task.subtasks:
for j, subtask in enumerate(task.subtasks, 1):
print(f" {i}.{j} {subtask.instructions}")
except Exception as e:
print(f"Error processing question: {str(e)}")
# Run the async function
asyncio.run(process_question())
If you're still getting the parent task error, here's an alternative approach using a custom planning implementation:
```python
import marvin
from marvin import Agent
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class PlanningTask:
id: int
description: str
parent_id: Optional[int] = None
class CustomPlanner:
def __init__(self, agent: Agent):
self.agent = agent
async def create_plan(self, question: str, file_info: str) -> List[str]:
planning_prompt = f"""
Create a detailed plan for the following question:
{question}
Additional context:
{file_info}
Break this down into a numbered list of clear, sequential steps.
Each step should be specific and actionable.
"""
try:
response = await self.agent.run(planning_prompt)Marvin
07/31/2025, 6:04 PM