<@ULVA73B9P> Let's say i have a simple agentic fl...
# ask-marvin
p
@Marvin Let's say i have a simple agentic flow like this:
Copy code
@dataclass
class Survey:
    name: str
    age: int
    interests: list[str]
    experience_level: Literal["beginner", "intermediate", "advanced"]


if __name__ == '__main__':
    with marvin.Thread() as thread:
        # Create an interactive agent
        agent = marvin.Agent(
            name="Surveyor",
            instructions="Conduct a friendly survey about programming experience."
        )

        # Run the survey
        result = marvin.run(
            "Conduct a programming survey",
            agents=[agent],
            result_type=Survey,
            cli=True
        )

    print(result)
I also have a Prefect server. Is there an easy way of providing observability of what marvin is doing during it's workflow by showing its steps as Prefect tasks? It seem like Marvin tasks and Prefect tasks should map together quite nicely.
m
thought for 243.2 seconds
Great question — and you’re right, conceptually Marvin’s agent steps map nicely to Prefect observability primitives. Today there isn’t an out-of-the-box toggle that turns each Marvin “step” into a Prefect task, but you can get excellent visibility with a small amount of instrumentation: - Wrap the agent run in a Prefect flow (so you get run state, retries, logs) - Capture the conversation messages from the Marvin thread and: - log them with Prefect’s logger - emit Prefect Events (nice timeline in the UI) - publish a transcript as a Prefect Artifact - If you want each step to show up as a separate “Task” in the UI, call a small @task once per message Notes: - In Prefect 3.x (assumed), running interactive CLI prompts inside a flow is not recommended; workers typically don’t have a TTY. Set
cli=False
when running inside a flow. Here’s a minimal pattern you can drop in:
Copy code
from dataclasses import dataclass
from typing import Literal

import marvin
from prefect import flow, task, get_run_logger
from prefect.artifacts import create_markdown_artifact
from prefect.events.utilities import emit_event

@dataclass
class Survey:
    name: str
    age: int
    interests: list[str]
    experience_level: Literal["beginner", "intermediate", "advanced"]

@task
def record_message(thread_id: str, idx: int, role: str, content: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"[Marvin:{thread_id}] {idx} {role}: {content}")
    # Emit a Prefect Event for the timeline
    emit_event(
        event="marvin.message",
        resource={"prefect.resource.id": f"marvin.thread.{thread_id}"},
        payload={"index": idx, "role": role, "content": content},
    )

@flow
def survey_flow():
    logger = get_run_logger()

    with marvin.Thread() as thread:
        agent = marvin.Agent(
            name="Surveyor",
            instructions="Conduct a friendly survey about programming experience."
        )

        # Important: avoid CLI interactivity inside flows
        result = marvin.run(
            "Conduct a programming survey",
            agents=[agent],
            result_type=Survey,
            cli=False,
        )

        # Log/emit each message as its own Prefect task
        # Depending on Marvin version, messages are likely on `thread.messages`
        # and each has `.role` / `.content`
        messages = getattr(thread, "messages", [])
        for i, m in enumerate(messages):
            record_message.submit(str(getattr(thread, "id", "unknown")), i, m.role, m.content)

        # Publish a single artifact with the transcript
        transcript_lines = [f"{m.role}: {m.content}" for m in messages]
        transcript_md = "# Surveyor transcript\n\n" + "\n\n".join(transcript_lines) if messages else "No messages recorded."
        create_markdown_artifact(markdown=transcript_md, description="Conversation transcript from Marvin Surveyor")

        <http://logger.info|logger.info>(f"Final structured result: {result}")
        return result

if __name__ == "__main__":
    survey_flow()
What you get in Prefect: - A flow run representing the entire Marvin session - One task run per message (from
record_message
), so you can click into each step - A transcript artifact attached to the flow run - Events on the timeline for each message Why this approach: - It doesn’t require changes inside Marvin; it uses Marvin’s thread state after the run - It works in any worker environment - It’s easy to extend (e.g., emit events for tool starts/ends if you can hook those in your code) If you want even tighter step-level tracking: - If Marvin exposes callbacks for “on_message” / “on_tool_start” / “on_tool_end” in your version, you can call
record_message.submit(...)
or
emit_event(...)
inside those callbacks to surface each event in real time.
- If you want to truly run each LLM/tool step as a Prefect task before the agent proceeds, you’d need to drive the agent at a lower level than
marvin.run
(e.g., sending one message at a time) and wrap each send/receive in a task. That’s possible, but it depends on the Marvin API you’re using. Docs and references: - Prefect Artifacts: Artifacts - Prefect Events: Events - Marvin repo: Marvin on GitHub - Blog example (Marvin + Prefect): Using Marvin with Prefect A couple of quick questions to tailor this better: - Which Marvin version are you on? If you have message/tool callbacks available, we can wire those to Prefect tasks/events directly. - Are you okay dropping CLI interactivity (
cli=False
) inside your flows, or do you plan to run this locally where a TTY is available? - Would you prefer the lightweight logs/events/artifacts approach, or do you want each agent step to be an actual Prefect task that gates the next step?