datamongus
08/22/2025, 6:44 PMgather_data
is referenced in executive_summary
gather_data = marvin.run(
"gather information about the stores to build the executive summary",
context={},
result_type=List[Dict[str, Any]],
)
executive_summary = marvin.Agent(
name="Executive Summary Agent",
instructions="You create an executive summary for stores based on provided data.",
).run(
instructions=executive_summary_instructions(),
context={
"store_data": gather_data
},
raise_on_failure=True,
)
Marvin
08/22/2025, 6:44 PMMarvin
08/22/2025, 6:44 PMcontext
is an actual Python value (not an unresolved task/future).
A few nuances depending on how you’re running this:
- Plain Python (no Prefect): Your snippet is perfect. gather_data
holds the computed result and you pass it into the next .run
.
gather_data = marvin.run(
"gather information about the stores to build the executive summary",
context={},
result_type=List[Dict[str, Any]],
)
executive_summary = marvin.Agent(
name="Executive Summary Agent",
instructions="You create an executive summary for stores based on provided data.",
).run(
instructions=executive_summary_instructions(),
context={"store_data": gather_data},
raise_on_failure=True,
)
- In a Prefect 3.x flow with tasks:
- If marvin.run
and Agent.run
are not Prefect tasks, wrap them in Prefect tasks and pass the upstream result to the downstream task:
from prefect import flow, task
@task
def gather_data():
return marvin.run(
"gather information about the stores to build the executive summary",
context={},
result_type=List[Dict[str, Any]],
)
@task
def build_exec_summary(store_data):
return marvin.Agent(
name="Executive Summary Agent",
instructions="You create an executive summary for stores based on provided data.",
).run(
instructions=executive_summary_instructions(),
context={"store_data": store_data},
raise_on_failure=True,
)
@flow
def pipeline():
data = gather_data()
summary = build_exec_summary(data)
return summary
- If you try to call Agent.run
in non-task code while gather_data
is still a Prefect future, you’ll need to resolve it first (e.g., data = await gather_data
in an async flow or data = gather_data.result()
in sync code). Otherwise, pass the future into another Prefect task and let Prefect resolve it at runtime (recommended).
Questions to make sure we steer you right:
- Are you running this inside a Prefect flow?
- Are marvin.run
and Agent.run
themselves Prefect tasks, or plain function calls?
- Are you seeing an error when passing gather_data
into context
? If so, what’s the message?datamongus
08/22/2025, 6:47 PMMarvin
08/22/2025, 6:47 PMMarvin
08/22/2025, 6:47 PMcontext
. Your snippet will work as-is because gather_data
is a concrete Python value, not a future.
If it helps, here’s a minimal “tool-style” wrapper pattern:
def build_executive_summary():
store_data = marvin.run(
"gather information about the stores to build the executive summary",
context={},
result_type=List[Dict[str, Any]],
)
summary = marvin.Agent(
name="Executive Summary Agent",
instructions="You create an executive summary for stores based on provided data.",
).run(
instructions=executive_summary_instructions(),
context={"store_data": store_data},
raise_on_failure=True,
)
return summary
A few practical tips:
- Make sure store_data
is reasonably sized; context
usually ends up in the prompt. If it’s large, consider summarizing/chunking or passing a reference that a tool can fetch.
- Ensure the tool doesn’t accidentally call the same agent that invoked it in a loop (avoid recursive agent-in-agent calls unless you’ve planned for it).
- If you need deterministic behavior or retries, add error handling around the first call before invoking the second.
If you run into any serialization or size issues when passing store_data
, share the error and I can help adjust the pattern.datamongus
08/22/2025, 6:49 PMMarvin
08/22/2025, 6:49 PMMarvin
08/22/2025, 6:52 PM@prefect.task
gives you a lot of orchestration and reliability features you don’t get with a plain function call.
Key benefits:
- Retries with backoff/jitter: Automatically retry transient failures (e.g., 429/5xx) with configurable delays and custom retry conditions.
- Timeouts: Cap how long a call can run; hung API calls won’t block your pipeline indefinitely.
- Caching and deduplication: Skip re-running work when inputs haven’t changed; control cache keys and expiration to avoid repeat LLM/API costs.
- Concurrency and rate limits: Enforce global limits across flows/runs so you don’t overload APIs or resources. Works with named limits you define in the UI/API.
- Result persistence: Persist task outputs to result storage for later inspection/reuse; choose serializer and storage.
- Observability: Every task has a run, state, logs, duration, and exceptions visible in the Prefect UI. You can add hooks, artifacts, and structured logs.
- Parallelism and orchestration: Prefect schedules tasks, tracks dependencies, and can execute in parallel via the task runner; failures propagate with clear upstream/downstream context.
- Hooks and notifications: Run custom logic on success/failure (e.g., send a Slack message on failure).
- Logging: Capture `print`s with log_prints=True
and use get_run_logger()
for structured logs.
- Naming and tagging: Give tasks human-readable names, dynamic run names, and tags to filter and analyze in the UI.
Why this matters for tools/LLM calls:
- You can add retries for transient HTTP errors, enforce timeouts, cache by prompt/parameters, and apply global rate limits to keep costs and throttling under control.
- You get full visibility and reproducibility when something goes wrong.
Example: wrapping an LLM/API call safely
from datetime import timedelta
from prefect import task, flow, get_run_logger
from prefect.concurrency.sync import rate_limit
def should_retry(exc):
try:
status = getattr(exc, "response", None) and exc.response.status_code
except Exception:
status = None
# Retry common transient conditions
return isinstance(exc, (TimeoutError, ConnectionError)) or status in {429, 500, 502, 503, 504}
@task(
name="Call LLM",
retries=3,
retry_delay_seconds=[1, 2, 5],
retry_jitter_factor=0.2,
retry_condition_fn=should_retry,
timeout_seconds=60,
# Cache by prompt for 1 day to avoid repeated calls
cache_key_fn=lambda ctx, params: f"llm:{params.get('prompt')}",
cache_expiration=timedelta(days=1),
persist_result=True,
log_prints=True,
)
def call_llm(prompt: str) -> str:
logger = get_run_logger()
# Use a named limit you’ve created in the UI/API, e.g., name="openai"
with rate_limit(names="openai", occupy=1):
<http://logger.info|logger.info>("Calling LLM...")
# ... make the API call here ...
return "summary..."
@flow
def build_executive_summary(prompt: str):
return call_llm(prompt)
When to use tasks vs plain functions:
- Use plain functions for simple, local logic that doesn’t need retries, timeouts, caching, or UI visibility.
- Use @task
when you want resiliency, observability, concurrency/rate-limiting, or result management—especially for external systems like LLMs and HTTP APIs.
Docs and references:
- Prefect 3 release notes and concepts: Prefect 3 overview
- Global concurrency limits: Global concurrency limits
- Results and persistence: Results and persistence
- Tasks, retries, and logging patterns: Task result, retry, and logging patterns
If you share your current tool code, I can show exactly how I’d wrap it with @task
and which options make the most sense.