<@ULVA73B9P> I'm trying to setup testing with pref...
# marvin-ai
k
@Marvin I'm trying to setup testing with prefect, and I'd like to have the inputs and outputs of each task (to test against). How can I get the in- and outputs of the `TaskRun`s?
Copy code
@pytest.mark.asyncio
async def test_my_favorite_flow():
    result = await trigger_flow()

    async with get_client() as client:
        flow_runs: list[FlowRun] = await client.read_flow_runs()  # returns all flow runs
        flow_run = max(flow_runs, key=lambda fr: fr.created or "")

        task_runs: list[TaskRun] = await client.read_task_runs(
            flow_run_filter=FlowRunFilter.model_validate({"id": {"any_": [str(flow_run.id)]}}),
            limit=200,
        )

        for tr in task_runs:
            states: list[State] = await client.read_task_run_states(task_run_id=tr.id)
            completed = next((s for s in reversed(states) if s.type == "COMPLETED"), None)
            if not completed:
                continue

            # outputs
            data_doc = completed.data
            if data_doc is not None:
                storage_key = getattr(data_doc, "storage_key", None)
                if storage_key and Path(storage_key).exists():
                    with open(storage_key, "rb") as f:
                        value = cloudpickle.load(f)
                    print(tr.task_key, "output:", value)
                else:
                    print(tr.task_key, "output stored remotely or not persisted")
m
thought for 501.9 seconds
Great question. In Prefect 3.x you don’t need to manually open the
storage_key
to get results. There are built-in helpers to load both task outputs and (when available) task inputs. Here’s how you can do it: - Outputs: Use
prefect.states.get_state_result(state)
— it loads inline results or pulls from the configured result storage (local/remote) for you. - Inputs: When Prefect stores task parameters, the task’s state contains a
state_details.task_parameters_id
. You can load those bound arguments with
prefect.task_worker.read_parameters
and the workspace’s
ResultStore
. Updated version of your loop:
Copy code
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.objects import StateType
from prefect.states import get_state_result
from prefect.results import get_result_store
from prefect.task_worker import read_parameters

@pytest.mark.asyncio
async def test_my_favorite_flow():
    result = await trigger_flow()

    async with get_client() as client:
        flow_runs = await client.read_flow_runs()
        flow_run = max(flow_runs, key=lambda fr: fr.created or "")

        task_runs = await client.read_task_runs(
            flow_run_filter=FlowRunFilter.model_validate(
                {"id": {"any_": [str(flow_run.id)]}}
            ),
            limit=200,
        )

        for tr in task_runs:
            states = await client.read_task_run_states(task_run_id=tr.id)
            completed = next((s for s in reversed(states) if s.type is StateType.COMPLETED), None)
            if not completed:
                continue

            # OUTPUT: robustly load the result (inline or from remote storage)
            try:
                output_value = get_state_result(completed)
                print(tr.task_key, "output:", output_value)
            except Exception as exc:
                print(tr.task_key, f"output could not be loaded: {exc}")

            # INPUTS: if parameters were stored, read them from the ResultStore
            params = None
            tp_id = getattr(completed.state_details, "task_parameters_id", None)
            if tp_id:
                try:
                    store = get_result_store()
                    params = read_parameters(store, tp_id)
                    print(tr.task_key, "inputs:", params)
                except Exception as exc:
                    print(tr.task_key, f"inputs could not be loaded: {exc}")
            else:
                print(tr.task_key, "inputs not available (no task_parameters_id)")
Notes and tips: - To reliably retrieve outputs after a run, ensure results are persisted. Set
persist_result=True
on tasks you want to inspect, e.g.:
Copy code
@task(persist_result=True)
  def my_task(...):
      ...
and configure your result storage (local or a block pointing to S3/GCS/etc.). -
get_state_result
works for both task and flow states and handles remote/local result loading so you don’t have to chase
storage_key
. - Inputs are available only when Prefect stored the task’s parameters; when present, you’ll find the UUID at
state.state_details.task_parameters_id
. References: - States helper used above: get_state_result in prefect.states - Reading stored parameters: read_parameters in prefect.task_worker - Result storage utilities: prefect.results module If you share how you’re running the flow (in-process vs. worker) and which result storage you’ve configured, I can suggest the best persistence settings so inputs/outputs are always available for your tests.
k
@Marvin well, it's in a test context, so I don't really need external storage. That being said:
Copy code
choose_rollout-ffc4f3d2 output: <coroutine object get_state_result at 0x7901755f9a80>
choose_rollout-ffc4f3d2 inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: <coroutine object get_state_result at 0x79017544d3f0>
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: <coroutine object get_state_result at 0x79017544d310>
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: <coroutine object get_state_result at 0x79017544e0a0>
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
aggregate_precalcs-aa9503ee output: <coroutine object get_state_result at 0x79017544f3e0>
aggregate_precalcs-aa9503ee inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: <coroutine object get_state_result at 0x79017544f530>
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: <coroutine object get_state_result at 0x79017544f140>
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
calculate_container_bom_allocations-9f59289a output: <coroutine object get_state_result at 0x7901743760a0>
calculate_container_bom_allocations-9f59289a inputs not available (no task_parameters_id)
process_info_previous_rollouts-be32e8b8 output: <coroutine object get_state_result at 0x790174375930>
process_info_previous_rollouts-be32e8b8 inputs not available (no task_parameters_id)
predict_fry_lengths-24d60ac1 output: <coroutine object get_state_result at 0x7901743747b0>
predict_fry_lengths-24d60ac1 inputs not available (no task_parameters_id)
so I'm not really getting the inputs, nor the values of the output. This is the flow:
Copy code
@flow(name="trigger-flow", log_prints=True, persist_result=True, task_runner=ThreadPoolTaskRunner(max_workers=3))  # pyright: ignore[reportArgumentType, reportCallIssue]
async def trigger_flow():
    ...

    seeds = [1008, 1013, 2021, 1034, 1054]

    results = generate_rollout.map([input_] * len(seeds), seeds)

    final_rollout = choose_rollout(results)

    return final_rollout
which does add some inputs to e.g.
choose_rollout
m
thought for 418.9 seconds
Thanks — two things are happening here: 1) You’re seeing a coroutine for outputs because
get_state_result
is async in your environment — you need to await it. 2) You’re not seeing inputs because (in local execution with
ThreadPoolTaskRunner
) Prefect does not persist task parameters, so
state.state_details.task_parameters_id
is not set. Here’s a corrected version of your loop that awaits results:
Copy code
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.objects import StateType
from prefect.states import get_state_result

@pytest.mark.asyncio
async def test_my_favorite_flow():
    result = await trigger_flow()

    async with get_client() as client:
        flow_runs = await client.read_flow_runs()
        flow_run = max(flow_runs, key=lambda fr: fr.created or "")

        task_runs = await client.read_task_runs(
            flow_run_filter=FlowRunFilter.model_validate(
                {"id": {"any_": [str(flow_run.id)]}}
            ),
            limit=200,
        )

        for tr in task_runs:
            states = await client.read_task_run_states(task_run_id=tr.id)
            completed = next((s for s in reversed(states) if s.type is StateType.COMPLETED), None)
            if not completed:
                continue

            try:
                value = await get_state_result(completed)  # IMPORTANT: await
                print(tr.task_key, "output:", value)
            except Exception as exc:
                print(tr.task_key, f"output could not be loaded: {exc}")

            # Inputs: not available in local thread runner
            print(tr.task_key, "inputs not available (no task_parameters_id)")
About inputs in tests - In local/threaded runs, Prefect doesn’t store task parameters;
task_parameters_id
will be None. The
read_parameters(...)
path is only used when tasks are executed by a Task Worker that persists parameters for remote execution. - If you need inputs for assertions in a test: - Instrument your tasks to emit artifacts (or logs) with their bound args and assert on those. Example:
Copy code
from prefect import task
    from prefect.runtime import task_run
    from prefect.artifacts import create_json_artifact

    @task(persist_result=True)
    def choose_rollout(results):
        create_json_artifact(
            key=f"inputs-{task_run.id}",
            data={"results_len": len(results)}
        )
        ...
Then query artifacts in your test and assert on their contents. - Or, since your
choose_rollout
receives the outputs of the mapped
generate_rollout
tasks, you can reconstruct its “inputs” by reading those upstream task outputs and asserting on the list you would have passed in. - For pure unit tests, call the task function(s) directly with known arguments and assert on behavior — Prefect tasks are regular Python callables. A few extra tips - If you want outputs to always be retrievable later, set
persist_result=True
on the tasks whose results you want to inspect (flow-level
persist_result=True
only covers the flow’s return, not task returns). -
get_state_result
handles inline vs. stored results for you; no need to open
storage_key
files manually. References - get_state_result source: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/states.py#get_state_result - Task parameter persistence helpers (used in worker scenarios): https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_worker.py#L523 If you want, share one of your task definitions (e.g.,
generate_rollout
and
choose_rollout
) and I can sketch a small artifact-based input capture pattern tailored to your test.
k
@Marvin so with the loop above, I get the output values, but the inputs are not shown because they are not available this way. I did set
persist_result=True
, so I'd expect them to be persisted somewhere, and hence, retreivable?
Copy code
choose_rollout-ffc4f3d2 output: 278.5704334531748
choose_rollout-ffc4f3d2 inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: 277.68456421271276
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: 277.82360947495306
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: 277.98548673144745
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
aggregate_precalcs-aa9503ee output: [277.9658531202372, 278.46997724655864, 276.3281160860367]
aggregate_precalcs-aa9503ee inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: 277.8614720668994
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
generate_rollout-94e7ea2c output: 277.93590628245454
generate_rollout-94e7ea2c inputs not available (no task_parameters_id)
calculate_container_bom_allocations-9f59289a output: 278.46997724655864
calculate_container_bom_allocations-9f59289a inputs not available (no task_parameters_id)
process_info_previous_rollouts-be32e8b8 output: 276.3281160860367
process_info_previous_rollouts-be32e8b8 inputs not available (no task_parameters_id)
predict_fry_lengths-24d60ac1 output: 277.9658531202372
predict_fry_lengths-24d60ac1 inputs not available (no task_parameters_id)
That being said, also,
create_json_artifact
doesn't seem to exist, and I'd like to know how to fetch them in the loop above - I assume they are fetchable starting from the task run?
m
thought for 221.0 seconds
You’re right: you’re getting outputs now, but not inputs. Two clarifications: - persist_result=True only persists the return value (output), not the bound args/kwargs (inputs). That’s why state_details.task_parameters_id is None in your local ThreadPoolTaskRunner run. - There isn’t a create_json_artifact helper in Prefect 3.x. Use create_markdown_artifact or create_table_artifact to record structured inputs you can fetch later. If you want to fetch inputs from each TaskRun in your loop, the simplest approach is to emit an artifact inside each task with the inputs, then load those artifacts by task_run_id. Example: capture inputs inside tasks
Copy code
from prefect import task
from prefect.runtime import task_run
from prefect.artifacts import create_markdown_artifact  # or create_table_artifact
import json

@task(persist_result=True)
def generate_rollout(input_, seed):
    create_markdown_artifact(
        key=f"inputs-{task_run.id}",
        markdown="```json\n" + json.dumps({"input": input_, "seed": seed}, default=str) + "\n```",
    )
    # ... do work ...
    return 42  # your real result

@task(persist_result=True)
def choose_rollout(results):
    create_markdown_artifact(
        key=f"inputs-{task_run.id}",
        markdown="```json\n" + json.dumps({"results_len": len(results)}, default=str) + "\n```",
    )
    # ... do work ...
    return max(results)
Then, fetch those artifacts per task run in your test loop:
Copy code
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, ArtifactFilter
from prefect.client.schemas.objects import StateType
from prefect.states import get_state_result

@pytest.mark.asyncio
async def test_my_favorite_flow():
    result = await trigger_flow()

    async with get_client() as client:
        flow_runs = await client.read_flow_runs()
        flow_run = max(flow_runs, key=lambda fr: fr.created or "")

        task_runs = await client.read_task_runs(
            flow_run_filter=FlowRunFilter.model_validate({"id": {"any_": [str(flow_run.id)]}}),
            limit=200,
        )

        for tr in task_runs:
            states = await client.read_task_run_states(task_run_id=tr.id)
            completed = next((s for s in reversed(states) if s.type is StateType.COMPLETED), None)
            if not completed:
                continue

            # Output value (works because persist_result=True on the task)
            value = await get_state_result(completed)
            print(tr.task_key, "output:", value)

            # Fetch any input artifacts created by the task
            artifacts = await client.read_artifacts(
                artifact_filter=ArtifactFilter.model_validate(
                    {"task_run_id": {"any_": [str(tr.id)]}}
                )
            )
            if artifacts:
                for a in artifacts:
                    print(tr.task_key, "artifact:", getattr(a, "data", None))
            else:
                print(tr.task_key, "no input artifacts found")
Why inputs aren’t showing up automatically - In local in-process/threaded execution, Prefect does not persist task parameters. The task_parameters_id in state_details is only set when a Task Worker persists parameters for remote/distributed execution. There isn’t a toggle to “persist inputs” for local thread runs; persist_result applies to outputs only. References - Artifacts API: Artifacts in Prefect 3.x - Helper we used for results: prefect.states.get_state_result - Task parameter persistence (worker logic): prefect.task_worker.read_parameters If you prefer table-structured inputs instead of markdown, swap to create_table_artifact and pass rows=[{"input": input_, "seed": seed}].
k
@Marvin OK, let's try something else and let's use
mocker.spy
to spy on the tasks, and use that to find the arguments.
m
thought for 64.1 seconds
Spying will work well here, since you’re running locally with ThreadPoolTaskRunner and the task function actually executes in-process. Key idea: spy on the task’s underlying function via the Task’s
.fn
attribute. Then inspect
.call_args_list
(or
.await_args_list
for async tasks) after the flow finishes. Example with pytest-mock:
Copy code
import inspect
import pytest
from prefect import task, flow
from prefect.task_runners import ThreadPoolTaskRunner

# import your real tasks instead of these placeholders
@task
def generate_rollout(input_, seed):
    return f"{input_}-{seed}"

@task
def choose_rollout(results):
    return max(results)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=3))
async def trigger_flow():
    seeds = [1008, 1013, 2021, 1034, 1054]
    input_ = "X"
    results = generate_rollout.map([input_] * len(seeds), seeds)
    return choose_rollout(results)

def _args_list_for_spy(spy):
    # Supports both sync and async task functions
    if hasattr(spy, "await_args_list") and spy.await_args_list:
        calls = spy.await_args_list
    else:
        calls = spy.call_args_list
    return [(c.args, c.kwargs) for c in calls]

@pytest.mark.asyncio
async def test_spy_task_inputs(mocker):
    # IMPORTANT: spy on the same objects the flow uses
    gen_spy = mocker.spy(generate_rollout, "fn")
    choose_spy = mocker.spy(choose_rollout, "fn")

    result = await trigger_flow()

    # Inspect generate_rollout calls (mapping -> multiple calls)
    gen_calls = _args_list_for_spy(gen_spy)
    # Each entry is (args, kwargs); args == (input_, seed)
    for (args, kwargs) in gen_calls:
        input_, seed = args  # kwargs should be {}
        print("generate_rollout called with:", input_, seed)

    # Inspect choose_rollout calls
    choose_calls = _args_list_for_spy(choose_spy)
    # Typically one call; args[0] is the list of mapped results
    for (args, kwargs) in choose_calls:
        results_list = args[0]
        print("choose_rollout called with:", results_list)

    # Assertions as needed
    assert result is not None
    assert len(gen_calls) == 5
Notes and tips: - Use
.fn
for the spy. Prefect tasks are callable objects; the user function that actually runs is stored at
task.fn
. - If your task function is async,
pytest-mock
exposes
spy.await_args_list
. The helper
_args_list_for_spy
above handles both sync and async cases. - Make sure you spy on the exact object your flow uses. If your flow imports
generate_rollout
from
my_module
, then in the test do
from my_module import generate_rollout
and spy on that imported object. - Mapping will trigger multiple calls to the task function; you’ll see one entry per map item. - Retries will appear as additional calls; if you only care about first or last call, index accordingly. This gives you precise visibility into the inputs passed to each task without needing to persist parameters or write artifacts. You can combine this with your existing state-result loading if you still want to assert on outputs recorded by Prefect.
k
nice, thx!