PyHannes
09/18/2025, 8:29 AM@dataclass
class Survey:
name: str
age: int
interests: list[str]
experience_level: Literal["beginner", "intermediate", "advanced"]
if __name__ == '__main__':
with marvin.Thread() as thread:
# Create an interactive agent
agent = marvin.Agent(
name="Surveyor",
instructions="Conduct a friendly survey about programming experience."
)
# Run the survey
result = marvin.run(
"Conduct a programming survey",
agents=[agent],
result_type=Survey,
cli=True
)
print(result)
I also have a Prefect server. Is there an easy way of providing observability of what marvin is doing during it's workflow by showing its steps as Prefect tasks?
It seem like Marvin tasks and Prefect tasks should map together quite nicely.Marvin
09/18/2025, 8:29 AMMarvin
09/18/2025, 8:33 AMcli=False
when running inside a flow.
Here’s a minimal pattern you can drop in:
from dataclasses import dataclass
from typing import Literal
import marvin
from prefect import flow, task, get_run_logger
from prefect.artifacts import create_markdown_artifact
from prefect.events.utilities import emit_event
@dataclass
class Survey:
name: str
age: int
interests: list[str]
experience_level: Literal["beginner", "intermediate", "advanced"]
@task
def record_message(thread_id: str, idx: int, role: str, content: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"[Marvin:{thread_id}] {idx} {role}: {content}")
# Emit a Prefect Event for the timeline
emit_event(
event="marvin.message",
resource={"prefect.resource.id": f"marvin.thread.{thread_id}"},
payload={"index": idx, "role": role, "content": content},
)
@flow
def survey_flow():
logger = get_run_logger()
with marvin.Thread() as thread:
agent = marvin.Agent(
name="Surveyor",
instructions="Conduct a friendly survey about programming experience."
)
# Important: avoid CLI interactivity inside flows
result = marvin.run(
"Conduct a programming survey",
agents=[agent],
result_type=Survey,
cli=False,
)
# Log/emit each message as its own Prefect task
# Depending on Marvin version, messages are likely on `thread.messages`
# and each has `.role` / `.content`
messages = getattr(thread, "messages", [])
for i, m in enumerate(messages):
record_message.submit(str(getattr(thread, "id", "unknown")), i, m.role, m.content)
# Publish a single artifact with the transcript
transcript_lines = [f"{m.role}: {m.content}" for m in messages]
transcript_md = "# Surveyor transcript\n\n" + "\n\n".join(transcript_lines) if messages else "No messages recorded."
create_markdown_artifact(markdown=transcript_md, description="Conversation transcript from Marvin Surveyor")
<http://logger.info|logger.info>(f"Final structured result: {result}")
return result
if __name__ == "__main__":
survey_flow()
What you get in Prefect:
- A flow run representing the entire Marvin session
- One task run per message (from record_message
), so you can click into each step
- A transcript artifact attached to the flow run
- Events on the timeline for each message
Why this approach:
- It doesn’t require changes inside Marvin; it uses Marvin’s thread state after the run
- It works in any worker environment
- It’s easy to extend (e.g., emit events for tool starts/ends if you can hook those in your code)
If you want even tighter step-level tracking:
- If Marvin exposes callbacks for “on_message” / “on_tool_start” / “on_tool_end” in your version, you can call record_message.submit(...)
or emit_event(...)
inside those callbacks to surface each event in real time.Marvin
09/18/2025, 8:33 AMmarvin.run
(e.g., sending one message at a time) and wrap each send/receive in a task. That’s possible, but it depends on the Marvin API you’re using.
Docs and references:
- Prefect Artifacts: Artifacts
- Prefect Events: Events
- Marvin repo: Marvin on GitHub
- Blog example (Marvin + Prefect): Using Marvin with Prefect
A couple of quick questions to tailor this better:
- Which Marvin version are you on? If you have message/tool callbacks available, we can wire those to Prefect tasks/events directly.
- Are you okay dropping CLI interactivity (cli=False
) inside your flows, or do you plan to run this locally where a TTY is available?
- Would you prefer the lightweight logs/events/artifacts approach, or do you want each agent step to be an actual Prefect task that gates the next step?