PyHannes
09/28/2025, 7:38 AMMarvin
09/28/2025, 7:38 AMMarvin
09/28/2025, 7:42 AM# tool: trigger a Prefect deployment by name
from typing import Optional, Dict, Any
from prefect.deployments.flow_runs import run_deployment
def run_prefect_deployment(
deployment_name: str,
parameters: Optional[Dict[str, Any]] = None,
flow_run_name: Optional[str] = None,
) -> str:
"""
Trigger a Prefect deployment by its full name and return the flow run ID.
Example deployment_name: 'project/flow:deployment'
Parameters map to your flow parameters, e.g. {"start_date": "2024-01-01"}.
"""
fr = run_deployment(
name=deployment_name,
parameters=parameters,
flow_run_name=flow_run_name,
)
return f"Scheduled flow run {fr.id} for {deployment_name}"
Wire it into a Marvin agent (simple run)
from marvin import run
user_request = "Backfill orders for 2024-01-01 to 2024-01-07"
result = run(
instructions=(
"You are an operations agent. Decide which Prefect deployment to run and call the tool. "
"If parameters are needed, infer them from the user message."
),
tools=[run_prefect_deployment],
messages=[user_request],
)
print(result)
Example: one tool per flow (often easier for the model)
from prefect.deployments.flow_runs import run_deployment
def run_daily_report(date: str) -> str:
"""
Run the 'analytics/daily_report:prod' deployment for a specific date (YYYY-MM-DD).
"""
fr = run_deployment("analytics/daily_report:prod", parameters={"date": date})
return f"Daily report scheduled: {fr.id}"
def run_backfill(start_date: str, end_date: str) -> str:
"""
Run the 'analytics/backfill:default' deployment for a date range (YYYY-MM-DD).
"""
fr = run_deployment(
"analytics/backfill:default",
parameters={"start_date": start_date, "end_date": end_date},
)
return f"Backfill scheduled: {fr.id}"
from marvin import run
result = run(
instructions="Choose the most appropriate tool to satisfy the user's request.",
tools=[run_daily_report, run_backfill],
messages=["Backfill orders for 2024-01-01 to 2024-01-07"],
)
Async (if you prefer)
import asyncio
from typing import Optional, Dict, Any
from prefect.client.orchestration import get_client
async def run_deployment_async(
deployment_name: str,
parameters: Optional[Dict[str, Any]] = None,
) -> str:
"""
Async trigger using the Prefect client. deployment_name like 'project/flow:deployment'.
"""
async with get_client() as client:
dep = await client.read_deployment_by_name(deployment_name)
fr = await client.create_flow_run_from_deployment(
dep.id,
parameters=parameters or {},
)
return str(fr.id)
# in Marvin, you can register async tools as well
Requirements and tips
- Deployments and workers: Ensure your flows are deployed and a worker is running for the target work pool so runs get picked up.
- In Prefect 3.x, build deployments with flow.from_source(...).deploy(...)
and submit runs with prefect.deployments.flow_runs.run_deployment(...)
.
- Auth: Set Prefect auth/env before running Marvin so tools can talk to your Prefect API:
- PREFECT_API_URL (Cloud example): https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>
- PREFECT_API_KEY: your Prefect Cloud API keyMarvin
09/28/2025, 7:42 AM