Kurt Sys (Vintecc)
09/24/2025, 6:16 PM@pytest.mark.asyncio
async def test_my_favorite_flow():
result = await trigger_flow()
async with get_client() as client:
flow_runs: list[FlowRun] = await client.read_flow_runs()
if not flow_runs:
raise RuntimeError("No flow runs found")
flow_run: FlowRun = flow_runs[0]
task_runs: list[TaskRun] = await client.read_task_runs(
flow_run_filter=FlowRunFilter.model_validate({"id": {"any_": [str(flow_run.id)]}}),
limit=200,
)
So this gets the task runs of a flow run, but now, how can I get the arguments and return values of the task runs?Nate
09/24/2025, 6:23 PMKurt Sys (Vintecc)
09/24/2025, 7:27 PMNate
09/24/2025, 7:28 PMNate
09/24/2025, 7:28 PMa json artifactvariables serve this purpose
Nate
09/24/2025, 7:29 PMKurt Sys (Vintecc)
09/24/2025, 7:57 PMmocker.spy
seems to still be empty, probably because I'm using it wrong here:
generate_rollout_spy = mocker.spy(generate_rollout, "__call__")
I guess the __call__
is not what I should use here?
Are there any examples or docs on how 'fetching the task args (and results) in tests' can be done?Nate
09/24/2025, 8:11 PMMarvin
09/24/2025, 8:11 PMMarvin
09/24/2025, 8:16 PM# my_flow.py
from prefect import flow, task
@task
def add(x, y=2):
return x + y
@flow
def my_flow():
return add(1, y=3)
# test_my_flow.py
from unittest.mock import MagicMock
import inspect
import my_flow
def test_add_called_with_expected_params():
# capture the original signature to normalize args->param names
orig_sig = inspect.signature(my_flow.add.fn)
spy = MagicMock(wraps=my_flow.add.fn)
my_flow.add.fn = spy # patch the function that the task actually executes
my_flow.my_flow()
spy.assert_called_once()
ba = orig_sig.bind_partial(*spy.call_args.args, **spy.call_args.kwargs)
assert ba.arguments == {"x": 1, "y": 3}
Pattern B: Wrap the task function to record bound parameters via TaskRunContext
- This gives you the canonical parameters dict Prefect computed (no need to re-bind args).
# conftest.py or inline in test
import functools, inspect
from prefect.context import get_run_context
def spy_on_task(task_obj):
calls = []
original = task_obj.fn
if inspect.iscoroutinefunction(original):
@functools.wraps(original)
async def wrapper(*a, **k):
params = get_run_context().parameters.copy()
calls.append(params)
return await original(*a, **k)
else:
@functools.wraps(original)
def wrapper(*a, **k):
params = get_run_context().parameters.copy()
calls.append(params)
return original(*a, **k)
task_obj.fn = wrapper
return calls
Usage:
# test_my_flow.py
import my_flow
from .conftest import spy_on_task
def test_params_with_context_capture():
calls = spy_on_task(my_flow.add)
my_flow.my_flow()
assert calls == [{"x": 1, "y": 3}]
Notes and gotchas
- Retries: Your spy will capture each retry as an additional call; assert on call count as needed.
- ProcessPoolTaskRunner: If you use processes, ensure the patched/wrapped task is defined in the module that child processes import (i.e., patch the symbol in the module under test before the pool starts).
- Keep task config intact: These approaches only wrap/replace the underlying function; they do not change task options like retries, timeouts, etc.
Related docs
- Tasks: Prefect tasks
- Context: get_run_context API
- Testing guide: Testing flows and tasks
If you share a snippet of your flow/task setup, I can tailor the spy wrapper to your exact structure (async/sync, mapping, etc.).Kurt Sys (Vintecc)
09/24/2025, 8:25 PMgenerate_rollout_spy = mocker.spy(generate_rollout, "fn")
result = await trigger_flow()
gen_calls = _args_list_for_spy(generate_rollout_spy)
for args, kwargs in gen_calls:
input_, seed = args
print("generate_rollout called with:", input_, seed)
print("generate_rollout called with kwargs:", kwargs)
print("output", generate_rollout_spy.spy_return_list)
Nate
09/24/2025, 8:25 PMKurt Sys (Vintecc)
09/24/2025, 8:26 PM"fn"
part: needed to know some of the Prefect internals to get that one - or a Marvin to help me out.