Hey all, I'm setting up prefect tests. I got the v...
# ask-community
k
Hey all, I'm setting up prefect tests. I got the very basics working, but I wonder if it's possible to get the in- and outputs of the individual tasks of a flow:
Copy code
@pytest.mark.asyncio
async def test_my_favorite_flow():
    result = await trigger_flow()

    async with get_client() as client:
        flow_runs: list[FlowRun] = await client.read_flow_runs()
        if not flow_runs:
            raise RuntimeError("No flow runs found")
        flow_run: FlowRun = flow_runs[0]

        task_runs: list[TaskRun] = await client.read_task_runs(
            flow_run_filter=FlowRunFilter.model_validate({"id": {"any_": [str(flow_run.id)]}}),
            limit=200,
        )
So this gets the task runs of a flow run, but now, how can I get the arguments and return values of the task runs?
n
hi @Kurt Sys (Vintecc) - not from the API, bc task run inputs are not persisted server side / available via the API
k
Thx, so this means I should store them as an artifact, if I want to know them? (But there's no such thing as e.g. a json artifact?) There's no way to store the inputs somehow?
n
it entirely depends on how you're writing the tests
a json artifact
variables serve this purpose
but if you're just trying to see what you're passing to a callable, then a spy pattern using pytest/unittest is common (ie you don't have to write to the api if you're writing unit tests)
k
Hey @Nate, right, so I don't actually need the data to be stored as such (by prefect), but using
mocker.spy
seems to still be empty, probably because I'm using it wrong here:
Copy code
generate_rollout_spy = mocker.spy(generate_rollout, "__call__")
I guess the
__call__
is not what I should use here? Are there any examples or docs on how 'fetching the task args (and results) in tests' can be done?
n
i am unfamiliar with the details of how you're testing but i will say that LLMs are pretty good at these things @Marvin wanna give a quick rundown on how to impl a spy pattern to assert task run params in prefect?
m
thought for 276.3 seconds
Totally—here are a few quick, reliable “spy” patterns for asserting Prefect task call params in Prefect 3.x. They all work locally and with the engine; pick the one that best fits your test style. Pattern A: Patch the task’s underlying function with a MagicMock spy - Prefect executes the underlying Python function stored on the task; spying on that function captures the actual args/kwargs used by the engine. - Works great with threads; if you use processes, patch the symbol in the module under test so it’s visible in the child process.
Copy code
# my_flow.py
from prefect import flow, task

@task
def add(x, y=2):
    return x + y

@flow
def my_flow():
    return add(1, y=3)

# test_my_flow.py
from unittest.mock import MagicMock
import inspect
import my_flow

def test_add_called_with_expected_params():
    # capture the original signature to normalize args->param names
    orig_sig = inspect.signature(my_flow.add.fn)

    spy = MagicMock(wraps=my_flow.add.fn)
    my_flow.add.fn = spy  # patch the function that the task actually executes

    my_flow.my_flow()

    spy.assert_called_once()
    ba = orig_sig.bind_partial(*spy.call_args.args, **spy.call_args.kwargs)
    assert ba.arguments == {"x": 1, "y": 3}
Pattern B: Wrap the task function to record bound parameters via TaskRunContext - This gives you the canonical parameters dict Prefect computed (no need to re-bind args).
Copy code
# conftest.py or inline in test
import functools, inspect
from prefect.context import get_run_context

def spy_on_task(task_obj):
    calls = []
    original = task_obj.fn

    if inspect.iscoroutinefunction(original):
        @functools.wraps(original)
        async def wrapper(*a, **k):
            params = get_run_context().parameters.copy()
            calls.append(params)
            return await original(*a, **k)
    else:
        @functools.wraps(original)
        def wrapper(*a, **k):
            params = get_run_context().parameters.copy()
            calls.append(params)
            return original(*a, **k)

    task_obj.fn = wrapper
    return calls
Usage:
Copy code
# test_my_flow.py
import my_flow
from .conftest import spy_on_task

def test_params_with_context_capture():
    calls = spy_on_task(my_flow.add)
    my_flow.my_flow()
    assert calls == [{"x": 1, "y": 3}]
Notes and gotchas - Retries: Your spy will capture each retry as an additional call; assert on call count as needed. - ProcessPoolTaskRunner: If you use processes, ensure the patched/wrapped task is defined in the module that child processes import (i.e., patch the symbol in the module under test before the pool starts). - Keep task config intact: These approaches only wrap/replace the underlying function; they do not change task options like retries, timeouts, etc. Related docs - Tasks: Prefect tasks - Context: get_run_context API - Testing guide: Testing flows and tasks If you share a snippet of your flow/task setup, I can tailor the spy wrapper to your exact structure (async/sync, mapping, etc.).
k
thx, got it working pretty elegant now, thanks to marvin:
Copy code
generate_rollout_spy = mocker.spy(generate_rollout, "fn")

    result = await trigger_flow()

    gen_calls = _args_list_for_spy(generate_rollout_spy)
    for args, kwargs in gen_calls:
        input_, seed = args
        print("generate_rollout called with:", input_, seed)
        print("generate_rollout called with kwargs:", kwargs)

    print("output", generate_rollout_spy.spy_return_list)
n
catjam
k
the crux here was the
"fn"
part: needed to know some of the Prefect internals to get that one - or a Marvin to help me out.